mina实践_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > mina实践

mina实践

 2012/3/17 15:26:30  hill007299  程序员俱乐部  我要评论(0)
  • 摘要:1单线程模型,echo过程。客户端当req大小超过服务端的receivebuffer时就会抛出buffer不足的异常。客户端执行过程AbstractPollingIoConnector.connect0(SocketAddress,SocketAddress,IoSessionInitializer<?extendsConnectFuture>)-》NioSocketConnector.connect(SocketChannel,SocketAddress)-
  • 标签:

?

1 单线程模型,echo过程。

?

客户端

当req大小超过服务端的receive buffer时就会抛出buffer不足的异常

客户端执行过程

AbstractPollingIoConnector.connect0(SocketAddress, SocketAddress, IoSessionInitializer<? extends ConnectFuture>)-》

NioSocketConnector.connect(SocketChannel, SocketAddress)-》

使用线程池ThreadPoolExecutor执行AbstractPollingIoConnector.Connector<T, H>

-》在connector中执行就绪选择NioSocketConnector.select(int)-》

NioSocketConnector.selectedHandles() ?从中拿到select key进而得到关联的socket channel-》

拿到socket channel 后再放到这个方法里处理AbstractPollingIoConnector.processConnections(Iterator<H>)-》将socket channel封装成Niosession ,并关联一个IoProcessor(SimpleIoProcessorPool), NioSocketConnector.newSession(IoProcessor<NioSession>, SocketChannel) ?-》IoProcessor.add(IoSession session)添加至?SimpleIoProcessorPool中-》SimpleIoProcessorPool.getProcessor(T session)-》AbstractPollingIoProcessor.add(T session)

-》AbstractPollingIoProcessor.startupProcessor()-》使用线程池ThreadPoolExecutor启动AbstractPollingIoProcessor$Processor,这个就是反应器-》AbstractPollingIoProcessor.select(long timeout)反应器采用轮询的方式进行就绪选择,选择器注册了读事件-》如果有读事件的话,那么执行?AbstractPollingIoProcessor.process(T session)。

?

?

IoConnector connector = new NioSocketConnector();
		StringBuffer req = new StringBuffer();
		int length = 2048 / 26;
		for (int i = 0; i < length; i++) {
			// 26 byte
			req.append("hello server-");
		}
		System.out.println(req.length() * 2 + "	bytes");
		connector.setConnectTimeoutMillis(30000);
		connector.getFilterChain().addLast(
				"codec",
				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
						.getValue(), LineDelimiter.WINDOWS.getValue())));
		connector.setHandler(new ClientHandler(req.toString()));
		connector.connect(new InetSocketAddress("localhost", 9123));
		while (true) {
			Thread.sleep(10000);

		}

?

public class ClientHandler extends IoHandlerAdapter {

	private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);

	private final String values;

	public ClientHandler(String values) {
		this.values = values;
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		String str = message.toString();
		log.info("The response message received length is [" + str.length() * 2 + "bytes ]");
		log.info("The response message received is [" + str + "]");

	}

	@Override
	public void sessionOpened(IoSession session) {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionOpened");
		session.write(values);
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionClosed");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		System.out.println("IDLE " + session.getIdleCount(status));
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	messageSent");
	}
}

?

?

public class ClientHandler extends IoHandlerAdapter {

	private final static Logger log = LoggerFactory.getLogger(ClientHandler.class);

	private final String values;
	private final int totalNum;
	private AtomicInteger finichNum = new AtomicInteger();
	private long start;

	public ClientHandler(String values, int totalNum) {
		this.values = values;
		this.totalNum = totalNum;
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		String str = message.toString();

		log.info("The response message received length is [" + str.length() * 2 + "bytes ]");
		log.info("The response message received is [" + str + "]");
		if (finichNum.addAndGet(1) == totalNum) {
			System.out.println(System.currentTimeMillis() - start);
		}
	}

	@Override
	public void sessionOpened(IoSession session) {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionOpened");
		session.write(values);
		if (finichNum.get() == 0) {
			start = System.currentTimeMillis();
		}
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionClosed");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		System.out.println("IDLE " + session.getIdleCount(status));
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	messageSent");
	}
}
?

?

?服务端

服务端执行过程:

AbstractIoAcceptor.bind(SocketAddress... addresses)-》AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress> localAddresses)-》启动AbstractPollingIoAcceptor.startupAcceptor()-》使用线程池启动AbstractIoService.executeWorker(Runnable worker) ,接收器AbstractPollingIoAcceptor.Acceptor-》接收器轮询对接收事件进行就绪选择-》如果有连接进来,从NioSocketAcceptor.selectedHandles()拿到select key中拿到ServerSocketChannel-》执行AbstractPollingIoAcceptor.accept(IoProcessor<T> processor,?H?handle)-》ServerSocketChannel.accept()在从中拿到SocketChannel-》通过NioSocketSession构造iosession-》SimpleIoProcessorPool.add(T session)处理器池中拿到NioProcessor-》AbstractPollingIoProcessor.add(T session)-》启动反应器AbstractPollingIoProcessor.startupProcessor()-》轮询进行读就绪选择NioProcessor.select(long timeout)-》触发读就绪事件AbstractPollingIoProcessor.process(T session)-》AbstractPollingIoProcessor.read(T session)-》IoFilterChain.fireMessageReceived(Object message) 进入过滤器流程,过滤器在构造接收器的时候就已经拼接好了(服务端先产生一个单例acceptor,触发读写事件后,acceptor再从SimpleIoProcessorPool(池里的AbstractPollingIoProcessor个数一般为cpu核数+1,如果业务处理无阻塞,那么这是最优的线程数)池里拿到一个AbstractPollingIoProcessor并创建一个与之关联的单例AbstractPollingIoProcessor.Processor

,在没有ExecutorFilter的情况下,反应器Processor和业务处理处于同一线程,属于单线程模型)。-》服务器接收到请求,那么在MyIoHandler.messageReceived(IoSession session, Object message)中读取,并发出响应IoSession.write(Object message)-》直接进入过滤器链IoFilterChain.fireFilterWrite(WriteRequest writeRequest)-》IoFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》进入编码解码器ProtocolCodecFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》最后一个环节,进行真正写操作DefaultIoFilterChain.HeadFilter.filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)-》放入写队列WriteRequestQueue.offer(IoSession session, WriteRequest writeRequest)-》iosession又拿起他的处理器池SimpleIoProcessorPool.getProcessor(T session)-》得到处理器执行AbstractPollingIoProcessor.flush(T session),flush并没有真正执行网络写操作,仅仅是唤醒就绪选择器Selector.wakeup()-》AbstractPollingIoProcessor.flush(long currentTime)-》AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime)-》NioProcessor.write(NioSession session, IoBuffer buf, int length)最后调用WritableByteChannel.write(ByteBuffer src)进行真正网络io操作(就绪选择器的写就绪是在write完之后触发的,也就是说我们要触发某个网络写操作,并不是依靠就绪选择器的write事件,而要自己创造条件,例如在反应器里面,通过判断写事件队列来决定是否进行网络写,就绪选择器的write事件是我们主动write产生的结果,可以参考mina,zookeeper的网络通信实现)。

?

		IoAcceptor acceptor = new NioSocketAcceptor();
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
		acceptor.getFilterChain().addLast(
				"codec",
				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
						.getValue(), LineDelimiter.WINDOWS.getValue())));
		acceptor.setHandler(new MyIoHandler());
		acceptor.bind(new InetSocketAddress(9123));
		while (true) {
			Thread.sleep(10000);

		}
	
?

?

public class MyIoHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		cause.printStackTrace();
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		String str = message.toString();
		log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
		log.info("The request message received is [" + str + "]");
		if (str.endsWith("quit")) {
			session.close(true);
			return;
		}
		session.write(message);
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionClosed");
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionOpened");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		// System.out.println("IDLE " + session.getIdleCount(status));
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	messageSent");
	}
} 

?

?

public class MyIoHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		cause.printStackTrace();
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		String str = message.toString();
		log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
		log.info("The request message received is [" + str + "]");
		if (str.endsWith("quit")) {
			session.close(true);
			return;
		}
		session.write(message);
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionClosed");
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionOpened");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		// System.out.println("IDLE " + session.getIdleCount(status));
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	messageSent");
	}
}
?

服务端网络io状态变迁:接收器accept事件就绪-》sessionCreated-》sessionOpened-》flushingSessions为空-》AbstractPollingIoProcessor.Processor读事件就绪-》messageReceived-》IoSession.write(Object message)-》flushingSessions.add(session),flushingSessions remove,session.getChannel().write(buf.buf()),客户端触发messageReceived-》?AbstractPollingIoProcessor.Processor写事件就绪触发-》flushingSessions.add(session),flushingSessions remove ,AbstractPollingIoProcessor.writeBuffer(T session, WriteRequest req, boolean hasFragmentation, int maxLength, long currentTime),fireMessageSent—》消息发送成功后messageSent。

flushingSessions充当写操作的一个同步器,决定何时执行写相关操作,并且当if (!buf.hasRemaining() ||

? ? ? ? ? ? ? ? !hasFragmentation && localWrittenBytes != 0) {

? // Buffer has been sent, clear the current request.

才会触发fireMessageSent。

?

客户端与之类似,唯一的区别在于一个connector,一个acceptor,用于前期的tcp连接建立;AbstractPollingIoProcessor.Processor用于连接成功后的数据读写操作。

?

?

?

2 多线程模型

客户端

?

每次connect成功都会产生一个连接session.

?

ExecutorService threadPool = Executors.newFixedThreadPool(3);
		for (int i = 0; i < threadNum; i++) {
			threadPool.execute(new ClientConnetion(connector));
		}
?

?

class ClientConnetion implements Runnable {
		private IoConnector connector;

		public ClientConnetion(IoConnector connector) {
			this.connector = connector;
		}

		@Override
		public void run() {
			connector.connect(new InetSocketAddress("localhost", PORT));

		}

	}

?

?

服务端

如果业务线程无阻塞,那么默认的配置n(cpu个数)+1个AbstractPollingIoProcessor.Processor 线程数量为最优,在同一个线程里处理io和业务。

当业务线程阻塞,那么就应该采用多线程模型。

单机环境下,客户并发10个请求,服务端服务计算时间为100ms,阻塞时间为1000ms,处理完100个请求总时间为48797ms。

?

在理想条件下,最优线程数可以通过公式(阻塞时间/计算时间+1)*cpu核数得出。上述情况比较合适的服务线程数为20个。在同样的环境下,服务端采用了多线程模型,处理完100个请求的总时间为 15593?ms。

(其他数据,5个线程,处理时间为22984ms;7个,22266ms;10个,17906ms;15个,16984;20个,15593;22个,16906;25个,15969;30个,17782ms;35个,17891。因为是非理想环境,所以测试数据有所偏差,仅供参考)

如下

?

?

	IoAcceptor acceptor = new NioSocketAcceptor();
		acceptor.getSessionConfig().setReadBufferSize(2048);
		acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
		acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(20, 20);
		acceptor.getFilterChain().addLast(
				"codec",
				new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS
						.getValue(), LineDelimiter.WINDOWS.getValue())));
		acceptor.setHandler(new MyIoHandler());
		acceptor.bind(new InetSocketAddress(9123));
		while (true) {
			Thread.sleep(10000);

		}

?

?

业务处理器

?

public class MyIoHandler extends IoHandlerAdapter {
	private final static Logger log = LoggerFactory.getLogger(MyIoHandler.class);

	@Override
	public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
		cause.printStackTrace();
	}

	@Override
	public void messageReceived(IoSession session, Object message) throws Exception {
		String str = message.toString();
		log.info("The request message received length is [" + str.length() * 2 + "bytes ]");
		log.info("The request message received is [" + str + "]");
		if (str.endsWith("quit")) {
			session.close(true);
			return;
		}
		cptTime(100);
		blkTime(1000);
		session.write(message);
	}

	private void cptTime(long time) {
		long num = 1000000 * time;
		int temp = 0;
		// 100 ms
		// long start = System.currentTimeMillis();
		for (int i = 0; i < num; i++) {
			temp++;
		}
		// System.out.println(System.currentTimeMillis() - start);
	}

	private void blkTime(long time) {
		try {
			Thread.sleep(time);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Override
	public void sessionCreated(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionCreated");
	}

	@Override
	public void sessionClosed(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionClosed");
	}

	@Override
	public void sessionOpened(IoSession session) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	sessionOpened");
	}

	@Override
	public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
		// System.out.println("IDLE " + session.getIdleCount(status));
	}

	@Override
	public void messageSent(IoSession session, Object message) throws Exception {
		log.info(session.getLocalAddress() + "------" + session.getRemoteAddress() + "	messageSent");
	}
}

?

使用 acceptor.getFilterChain().addLast("executor1", new ExecutorFilter(22, 22))就可以实现SEDA模型。把不同阶段的业务处理过程都抽象为一个stage,每个stage都有一个包含最优线程数量的线程池,每个stage之间通过一个事件队列串接起来,ExecutorFilter就是实现的关键环节。

  • 相关文章
发表评论
用户名: 匿名