class="java" name="code">
?之前看过关于NIO的介绍,但是没有深究。。前几天研究了java的NIO,然后偶然在IBMdeveloper上看到了一个年代久远的“基于时间的NIO多线程服务器”文章,于是我就仔细研究了下。http://www.ibm.com/developerworks/cn/java/l-niosvr/这是这篇文章的地址。相同的地方我就不贴了,直接看原帖就好了。我贴的是大致的流程和一些重要的类吧。。使用的也就是channel 和selector,还有资源池。。
?
它这篇文章它使用了观察者模式,并且将客户端传来的信息封装成一个request类,将要写给客户端的也封装成一个response类,这都是为了后续处理方便。。还有观察者模式的那些处理我就不粘贴了。它的这篇文章实现的也是短连接,即客户端连上之后读取一次内容并且输入一次之后就断开连接了。
?
在程序中开了三个线程,一个是主线程,一个时读通道数据的线程,另一个是写通道数据的线程。

?主线程:
?
package nio1; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Set; public class Server implements Runnable { //主线程中的写通道池,用来存放从读取之后可以进行写操作的通道。 private static List<SelectionKey> wpool = new LinkedList<SelectionKey>(); private Notify notify = Notify.getInstance(); private ServerSocketChannel ssChannel; private static Selector selector; private int port; public Server(int port) throws Exception{ this.port = port; //创建了一个读线程和一个写线程 new Thread(new Reader()).start(); new Thread(new Writer()).start(); selector = Selector.open(); ssChannel = ServerSocketChannel.open(); ssChannel.configureBlocking(false); ServerSocket ss = ssChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); ss.bind(address); ssChannel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void run() { // TODO Auto-generated method stub System.out.println("Server start.."); System.out.println("Server listening on port: " + port); while (true){ try{ int num = 0; // 阻塞着等待着感兴趣的通道完成,但是如果此时一个读通道已经完成了,可以进行写通道的任务时,根据processWriteRequest唤醒它,唤醒后num=0; num = selector.select(); if (num > 0) { Set<SelectionKey> set = selector.selectedKeys(); Iterator<SelectionKey> it = set.iterator(); while (it.hasNext()){ SelectionKey sk = it.next(); //在已选择集中删除,如果不删除下次选择时它还是会存在的。 it.remove(); if (sk.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) sk.channel(); //这步没有用到,可以用来做已连接数的检测 notify.fireOnAccept(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); Request request = new Request(sc); // 注册读操作,以进行下一步的读操作, 这里将request作为一个附加对象加入进去。 sc.register(selector, SelectionKey.OP_READ, request); } //可读的通道就绪了 else if (sk.isReadable()) { Reader.processRequest(sk); sk.cancel(); } //可写的通道就绪了 else if (sk.isWritable()) { Writer.processRequest(sk); sk.cancel(); } } } else{ addRegister(); // 在Selector中注册新的写通道 } }catch (Exception e) { // TODO: handle exception notify.fireError("Error occured in Server: " + e.getMessage()); e.printStackTrace(); } } } //在读线程中将会调用这个方法,告诉主线程读通道的事件已经完成,现在可以进行写处理 public static void processWriteRequest(SelectionKey key){ synchronized (wpool) { wpool.add(wpool.size(), key); wpool.notifyAll(); } selector.wakeup(); // 解除selector的阻塞状态,以便注册新的写通道,否则他会一直在select哪里阻塞着等待 } //这个方法是将主线程中通道池里的准备好写的通道,注册到selector中。 public void addRegister(){ synchronized (wpool) { while (!wpool.isEmpty()) { SelectionKey key = wpool.remove(0); SocketChannel sc = (SocketChannel) key.channel(); try{ //将这个通道中的从客户端发来的包装好的request当做key中的附带物想selector注册 sc.register(selector, SelectionKey.OP_WRITE, key.attachment()); }catch (Exception e) { // TODO: handle exception try{ sc.finishConnect(); sc.socket().close(); sc.close(); }catch (Exception ex) { // TODO: handle exception ex.printStackTrace(); } e.printStackTrace(); } } } } }
?
?
读线程:
?
package nio1;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
public class Reader implements Runnable {
	//读线程中的可读的通道池
	private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
	private static final int MAX_CAPACITY = 1024;
	private static Notify notify = Notify.getInstance();
	
	public Reader(){
		
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		while (true){
			try{
				SelectionKey key;
				synchronized (pool) {
					while (pool.isEmpty()) {
						pool.wait();
					}
					key = pool.remove(0);
				}
				read(key);
				
			}catch (InterruptedException e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	}
	
	public void read(SelectionKey key){
		
		try{
			SocketChannel sc = (SocketChannel) key.channel();
			byte[] clientData = readRequest(sc);
			Request request = new Request(sc);
			request.setDataInput(clientData);
			
			//触发读事件,将request包装起来发给具体需要处理的业务逻辑
			notify.fireOnRead(request);
			
			// 提交主控线程进行写处理,就是通知主线程,通道中的信息我已经读完了,可以进行写入了。
            Server.processWriteRequest(key);
			
		}catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	public byte[] readRequest(SocketChannel sc) throws Exception{
		
		ByteBuffer buffer = ByteBuffer.allocate(MAX_CAPACITY);
		sc.read(buffer);
		byte[] req = new byte[MAX_CAPACITY * 10];
		req = buffer.array();
		
		return req;	
	}
	//当有可以被读的通道进来时,加入到通道池中,然后唤醒通道池以便处理
	public static void processRequest(SelectionKey key){
		synchronized (pool) {
			pool.add(pool.size(), key);
			pool.notifyAll();
		}
	}
}
?
?
写线程:
?
package nio1;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;
public class Writer implements Runnable {
	//写线程中的写通道池
	private static List<SelectionKey> pool = new LinkedList<SelectionKey>();
	private static Notify notify = Notify.getInstance();
	
	public Writer(){
		
	}
	
	@Override
	public void run() {
		// TODO Auto-generated method stub
		while (true) {
			try{
				SelectionKey key;
				synchronized (pool) {
					while (pool.isEmpty()) {
						pool.wait();
					}
					key = pool.remove(0);
				}
				write(key);
				
			}catch (Exception e) {
				// TODO: handle exception
				e.printStackTrace();
			}
		}
	}
	
	
	public void write(SelectionKey key){
		
		try{
			SocketChannel sc = (SocketChannel) key.channel();
			Response response = new Response(sc);
			
			//触发写事件
			notify.fireOnWrite((Request) key.attachment(), response);
			
			//写完一次之后就关闭了
			sc.finishConnect();
			sc.socket().close();
			sc.close();
			
		}catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	/**
     * 当写通道就绪之后,唤醒写线程中的通道池,处理将要写入的数据
     */
	public static void processRequest(SelectionKey key) {
        synchronized (pool) {
            pool.add(pool.size(), key);
            pool.notifyAll();
        }
    }
	
}
?
?
一个从开始到结束的访问顺序过程为:
? ? ? ?开启了主线程、读线程和写线程 -> 客户端连接 -> isAcceptable(),注册读通道 -> isReadable(),交给读线程 -> Reader.processRequest(),唤醒读通道池 -> read(),读通道数据 ->?Server.processWriteRequest(),通知主线程可以写了 ->?addRegister(),唤醒主线程中的写通道池,并且注册写通道 ->?isWritable() ->?Writer.processRequest(),唤醒写通道池 ->write(),写通道数据 -> 关闭!
?
据说这是mina的原理。。我也没研究过mina也不知道,但是使用了channel和selector的方法确实比以前的I/O方法的效率要高很多,这也是现在java越来越适合于写服务器的原因?
?
还有一点,在遍历selectedKey的时候要用迭代器,别用foreach循环,会出错。具体我也没试过,但是也给大家提个醒。
?
写的都是好久之前的东西了。。谁让以前都没弄清楚过呢。。。哎 过几天再研究NIO2!!
?