Java Code_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Java Code

Java Code

 2013/10/15 18:41:22  kiddcheney  程序员俱乐部  我要评论(0)
  • 摘要:/********************************************************************************Copyright(c)quickfixengine.orgAllrightsreserved.**ThisfileispartoftheQuickFIXFIXEngine**Thisfilemaybedistributedunderthetermsofthequickfixengine
  • 标签:Java
class="java" name="code">
/*******************************************************************************
 * Copyright (c) quickfixengine.org  All rights reserved. 
 * 
 * This file is part of the QuickFIX FIX Engine 
 * 
 * This file may be distributed under the terms of the quickfixengine.org 
 * license as defined by quickfixengine.org and appearing in the file 
 * LICENSE included in the packaging of this file. 
 * 
 * This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING 
 * THE WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A 
 * PARTICULAR PURPOSE. 
 * 
 * See http://www.quickfixengine.org/LICENSE for licensing information. 
 * 
 * Contact ask@quickfixengine.org if any conditions of this licensing 
 * are not clear to you.
 ******************************************************************************/

package com.qchen.quickfixmessage.util;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import junit.framework.Assert;

import org.apache.mina.common.CloseFuture;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportType;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAddress;
import org.apache.mina.transport.vmpipe.VmPipeConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import quickfix.mina.message.FIXProtocolCodecFactory;

public class TestConnection {
    private static HashMap<TransportType, IoConnector> connectors = new HashMap<TransportType, IoConnector>();
    private Logger log = LoggerFactory.getLogger(getClass());
    private HashMap<Integer, TestIoHandler> ioHandlers = new HashMap<Integer, TestIoHandler>();

    public void sendMessage(int clientId, String message) throws IOException {
        TestIoHandler handler = getIoHandler(clientId);
        handler.getSession().write(message);
    }

    private TestIoHandler getIoHandler(int clientId) {
        synchronized (ioHandlers) {
            return ioHandlers.get(Integer.valueOf(clientId));
        }
    }

    public void tearDown() {
        Iterator<TestIoHandler> handlerItr = ioHandlers.values().iterator();
        while (handlerItr.hasNext()) {
            CloseFuture closeFuture = handlerItr.next().getSession().close();
            closeFuture.join();
        }
        ioHandlers.clear();
    }

    public CharSequence readMessage(int clientId, long timeout) throws InterruptedException {
        return getIoHandler(clientId).getNextMessage(timeout);
    }

    public void waitForClientDisconnect(int clientId) throws IOException, InterruptedException {
        getIoHandler(clientId).waitForDisconnect();
    }

    public void connect(int clientId, TransportType transportType, int port)
            throws UnknownHostException, IOException {
        IoConnector connector = connectors.get(transportType);
        if (connector == null) {
            if (transportType == TransportType.SOCKET) {
                connector = new SocketConnector();
            } else if (transportType == TransportType.VM_PIPE) {
                connector = new VmPipeConnector();
            } else {
                throw new RuntimeException("Unsupported transport type: " + transportType);
            }
            connectors.put(transportType, connector);
        }
        
        SocketAddress address;
        if (transportType == TransportType.SOCKET) {
            address = new InetSocketAddress("localhost", port);
        } else if (transportType == TransportType.VM_PIPE) {
            address = new VmPipeAddress(port);
        } else {
            throw new RuntimeException("Unsupported transport type: " + transportType);
        }

        TestIoHandler testIoHandler = new TestIoHandler();
        synchronized (ioHandlers) {
            ioHandlers.put(Integer.valueOf(clientId), testIoHandler);
            ConnectFuture future = connector.connect(address, testIoHandler);
            future.join();
            Assert.assertTrue("connection to server failed", future.isConnected());
        }
    }

    private class TestIoHandler extends IoHandlerAdapter {
        private IoSession session;
        private BlockingQueue<Object> messages = new LinkedBlockingQueue<Object>();
        private CountDownLatch sessionCreatedLatch = new CountDownLatch(1);
        private CountDownLatch disconnectLatch = new CountDownLatch(1);

        public void sessionCreated(IoSession session) throws Exception {
            super.sessionCreated(session);
            this.session = session;
            session.getFilterChain().addLast("codec",
                    new ProtocolCodecFilter(new FIXProtocolCodecFactory()));
            sessionCreatedLatch.countDown();
        }

        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
            super.exceptionCaught(session, cause);
            log.error(cause.getMessage(), cause);
        }

        public void sessionClosed(IoSession session) throws Exception {
            super.sessionClosed(session);
            disconnectLatch.countDown();
        }

        public void messageReceived(IoSession session, Object message) throws Exception {
            messages.add(message);
        }
        
        public IoSession getSession() {
            try {
                sessionCreatedLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return session;
        }

        public String getNextMessage(long timeout) throws InterruptedException {
            return (String) messages.poll(timeout, TimeUnit.MILLISECONDS);
        }

        public void waitForDisconnect() throws InterruptedException {
            if (!disconnectLatch.await(500000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("client not disconnected");
            }
        }
    }
}



package com.qchen.quickfixmessage;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Date;

import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportType;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;

import com.qchen.quickfixmessage.util.TestConnection;

import quickfix.field.HeartBtInt;
import quickfix.field.SenderCompID;
import quickfix.fix40.Logon;
import quickfix.fix40.Message;

public class FixMinaClient {
	private TestConnection connection = new TestConnection();
	private static TransportType transportType = TransportType.SOCKET;
	private static int port = 9876;
	public static final String SOH = "\u0001";
	
	public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
//		SocketConnector connector = new SocketConnector();
//		SocketConnectorConfig config = connector.getDefaultConfig();
//
//		config.setConnectTimeout(3000);
//		config.getFilterChain().addLast("codec",
//				new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
//		connector.connect(new InetSocketAddress("localhost",9876), new ClientHandler(),config);
		FixMinaClient client = new FixMinaClient();
		
		StringBuilder sb = new StringBuilder();
		String time  = "20131015-09:06:26";
		sb.append("8=FIX.4.0").append(SOH).append("35=A").append(SOH).append("34=5").append(SOH)
		.append("49=BANZAI").append(SOH).append("52=").append(time).append(SOH).append("56=EXEC").append(SOH)
		.append("98=0").append(SOH).append("108=30").append(SOH).append("10=005").append(SOH);
		
//		8=FIX.4.09=5735=A34=149=TW52=20131015-09:00:2656=ISLD98=0108=3010=005
		String logonString = sb.toString();
		client.connection.connect(1, transportType, port);
		client.connection.sendMessage(1, logonString);
		CharSequence messageBack = client.connection.readMessage(1, 3000);
		System.out.println("back msg is "+messageBack);
	}
}

class ClientHandler extends IoHandlerAdapter{
	public void sessionOpened(IoSession session) { 
		System.err.println("session opened");
		Logon logonMessage = new Logon();
		logonMessage.set(new HeartBtInt(30));
		session.write(logonMessage);
	}

	private void accpetInput() {
		
	} 
	
	public void messageReceived(IoSession session, Object message) { 
		   System.out.println("in messageReceived!"); 
		   session.write("receive from server"); 
		} 

}

发表评论
用户名: 匿名