Netty源码学习-ServerBootstrap启动及事件处理过程_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > Netty源码学习-ServerBootstrap启动及事件处理过程

Netty源码学习-ServerBootstrap启动及事件处理过程

 2013/12/19 22:09:11  bylijinnan  程序员俱乐部  我要评论(0)
  • 摘要:Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:http://bylijinnan.iteye.com/blog/1992325Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的文章里面提到的操作,每一步都能在Netty里面找到对应的代码其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;而Reactor里面的Handler就对应Netty里面各ChannelHandler
  • 标签:事件 Server 学习 源码 net 过程 事件处理 启动
Netty是采用了Reactor模式的多线程版本,建议先看下面这篇文章了解一下Reactor模式:
http://bylijinnan.iteye.com/blog/1992325

Netty的启动及事件处理的流程,基本上是按照上面这篇文章来走的
文章里面提到的操作,每一步都能在Netty里面找到对应的代码
其中Reactor里面的Acceptor就对应Netty的ServerBootstrap.boss;
而Reactor里面的Handler就对应Netty里面各ChannelHandler(在worker里面跑)

由于流程涉及到比较多的类和方法,我提取一下Netty的骨架:
class="java" name="code">

ServerBootstrap.bind(localAddress)
|-->newServerSocketChannel & fireChannelOpen	(得到ServerSocketChannel[server])

-->
Binder.channelOpen
|-->Channels.bind(that is : sendDownstream of ChannelState.BOUND)

-->
In DefaultChannelPipeline, No downstreamHandler, jump to 
NioServerSocketPipelineSink.bind	(关键)
|-->1.do the REAL java.net.ServerSocket.bind	(server绑定端口)
	  2.start bossThread in bossExecutor
	  3.do "accept & dispatch" in a endless loop of bossThread(得到SocketChannel[client])
		 |--> registerAcceptedChannel, start worker in workerPool
			   |-->worker.run
					 |-->processSelectedKeys(selector.selectedKeys())
						   |--> read & fireMessageReceived	(开始调用各Handler)



下面就对照上面的“骨架”,把关键的代码拿出来读一下
其中关键的步骤,我用“===[关键步骤]===”的形式标记出来了

Netty的Server端是从ServerBootstrap.bind方法开始的:

public class ServerBootstrap extends Bootstrap {

	public Channel bind(final SocketAddress localAddress) {
			final BlockingQueue<ChannelFuture> futureQueue =
				new LinkedBlockingQueue<ChannelFuture>();

			ChannelHandler binder = new Binder(localAddress, futureQueue);

			ChannelPipeline bossPipeline = Channels.pipeline();
			bossPipeline.addLast("binder", binder);

			/*===OPEN===
			NioServerSocketChannelFactory.newChannel返回一个NioServerSocketChannel
			在NioServerSocketChannel的构造函数里,调用ServerSocketChannel.open()并触发channelOpen事件
			这个事件由上面的“binder”来处理并返回Future(非阻塞),详见Binder
			最后将Future放入futureQueue,以便在接下来的while循环里面取
			*/
			Channel channel = getFactory().newChannel(bossPipeline);
			
			// Wait until the future is available.
			ChannelFuture future = null;
			boolean interrupted = false;
			do {
				try {
					future = futureQueue.poll(Integer.MAX_VALUE, TimeUnit.SECONDS);
				} catch (InterruptedException e) {
					interrupted = true;
				}
			} while (future == null);

			//处理中断的一种方式,详见《Java并发编程实践》
			if (interrupted) {
				Thread.currentThread().interrupt();
			}

			// Wait for the future.
			future.awaitUninterruptibly();

			return channel;
	}

	//主要是处理channelOpen事件
	private final class Binder extends SimpleChannelUpstreamHandler {

			private final SocketAddress localAddress;
			private final BlockingQueue<ChannelFuture> futureQueue;
			private final Map<String, Object> childOptions =
				new HashMap<String, Object>();

			Binder(SocketAddress localAddress, BlockingQueue<ChannelFuture> futureQueue) {
				this.localAddress = localAddress;
				this.futureQueue = futureQueue;
			}

			public void channelOpen(
					ChannelHandlerContext ctx,
					ChannelStateEvent evt) {

				try {
					//处理各种option,例如keep alive,nodelay等等,省略代码
				} finally {
					ctx.sendUpstream(evt);
				}

				/*
				重点在这里
				这里bind方法只是触发sendDownstream(ChannelState.BOUND)
				而此时pipeline里面还没有ChannelDownstreamHandler(只有一个handler:“binder”):
				public void sendDownstream(ChannelEvent e) {
					DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
					if (tail == null) {
						try {
							getSink().eventSunk(this, e);
							return;
						} 
					}
					sendDownstream(tail, e);
				}
				因此ChannelState.BOUND会去到pipeline里面的sink,在sink里面执行最终的java.net.ServerSocket.bind操作
				详见NioServerSocketPipelineSink.bind
				*/
				boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress));
				assert finished;
			}
			}
}


NioServerSocketPipelineSink:

class NioServerSocketPipelineSink extends AbstractNioChannelSink {
	
	 private void bind(
            NioServerSocketChannel channel, ChannelFuture future,
            SocketAddress localAddress) {
        try {
		
			//在这里执行真正的===BIND===
            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
            bound = true;

            Executor bossExecutor =
                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
			
			//java.net.ServerSocket.bind完成,接下来可以accept了,详见Boss类的run方法
			//===BOSS start===,放入线程池里跑(bossExecutor)
            DeadLockProofWorker.start(bossExecutor,
                    new ThreadRenamingRunnable(new Boss(channel),
                            "New I/O server boss #" + id + " (" + channel + ')'));
            bossStarted = true;
        } 
		
    }

	private final class Boss implements Runnable {
        private final Selector selector;
        private final NioServerSocketChannel channel;

		/*
		===REGISTER[server]===
		注意到每新建一个Boss,就会新建一个selector
		*/
        Boss(NioServerSocketChannel channel) throws IOException {
            this.channel = channel;
            selector = Selector.open();
			channel.socket.register(selector, SelectionKey.OP_ACCEPT);
			registered = true;
            channel.selector = selector;
        }

		/*
		===ACCEPT&DISPATCH===
		boss不断地接受Client的连接并将连接成功的SocketChannel交由worker处理
		*/
        public void run() {
                        for (;;) {
                            SocketChannel acceptedSocket = channel.socket.accept();
                            if (acceptedSocket == null) {
                                break;
                            }
							
							//把acceptedSocket交由worker处理
                            registerAcceptedChannel(acceptedSocket, currentThread);
                        }
        }
		
		/*
		这里面的worker(implements Runnable)就相当于“Reactor Pattern”里面“Handler”
		handler需要两方面信息:selector和acceptedSocket,其中后者已经传递过来了,而selector则
		在worker.register里新创建一个
		*/
		private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
                ChannelPipeline pipeline = channel.getConfig().getPipelineFactory().getPipeline();
				
				//从WorkerPool里面取:workers[Math.abs(workerIndex.getAndIncrement() % workers.length)]
				//可见worker是re-used的
                NioWorker worker = nextWorker();
				
				/*
				值得注意的是new NioAcceptedSocketChannel(...)包含了一个关键操作:
				将pipeline与channel关联起来,一对一;见AbstractChannel类:
					protected AbstractChannel(
							Channel parent, ChannelFactory factory,
							ChannelPipeline pipeline, ChannelSink sink) {

						this.parent = parent;
						this.factory = factory;
						this.pipeline = pipeline;

						id = allocateId(this);

						pipeline.attach(this, sink);
					}
				*/
                worker.register(new NioAcceptedSocketChannel(
                        channel.getFactory(), pipeline, channel,
                        NioServerSocketPipelineSink.this, acceptedSocket,
                        worker, currentThread), null);
        }
    }
}


worker.register,主要工作是创建registerTask(implements Runnable)并放入registerTaskQueue
对应的类是NioWorker 和AbstractNioWorker:
	
	void register(AbstractNioChannel<?> channel, ChannelFuture future) {

		//只是创建Runnable,未启动。在worker的run方法中,processRegisterTaskQueue时候才执行
        Runnable registerTask = createRegisterTask(channel, future);
		
		//在start()里面启动worker线程
        Selector selector = start();
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

        if (wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }
	
	 private Selector start() {
        synchronized (startStopLock) {
            if (!started) {
                 selector = Selector.open();
				
				 //===WORKER start===
                DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O  worker #" + id));
            }
        }
        return selector;
    }

	private final class RegisterTask implements Runnable {
        private final NioSocketChannel channel;
        private final ChannelFuture future;
        private final boolean server;
        public void run() {
            try {
                synchronized (channel.interestOpsLock) {
				
					//===REGISTER[client]===	初始的state(getRawInterestOps)是OP_READ
                    channel.channel.register(selector, channel.getRawInterestOps(), channel);
                }
                fireChannelConnected(channel, remoteAddress);
			}
        }
    }
	


worker线程的run操作:


 public void run() {
        for (;;) {
				
				//===SELECT===
                SelectorUtil.select(selector);

                processRegisterTaskQueue();
                processEventQueue();
                processWriteTaskQueue();
				
				//在这里面,就会遍历selectedKey并调用相关联的handler
                processSelectedKeys(selector.selectedKeys());
        }
}	

private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
	for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
		SelectionKey k = i.next();
		i.remove();
		int readyOps = k.readyOps();
		
		//下面的“与”操作等价于k.isReadable
		if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
			//执行读操作
			if (!read(k)) {
				continue;
			}
		}
		//执行写操作
		if ((readyOps & SelectionKey.OP_WRITE) != 0) {
			writeFromSelectorLoop(k);
		}
	}
}

	/*
	主要是两个操作:
	1.从channel里面读取数据
	2.读取完成后,fireMessageReceived,从channel(k.attachment)
	 可以得到与它关联的pipeline,从而触发pipeline里面的handler
	*/
	protected boolean read(SelectionKey k) {
	
		final SocketChannel ch = (SocketChannel) k.channel();
		final NioSocketChannel channel = (NioSocketChannel) k.attachment();

		final ReceiveBufferSizePredictor predictor =
			channel.getConfig().getReceiveBufferSizePredictor();
		final int predictedRecvBufSize = predictor.nextReceiveBufferSize();

		int ret = 0;
		int readBytes = 0;
		boolean failure = true;

		ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize);
		try {
			while ((ret = ch.read(bb)) > 0) {
				readBytes += ret;
				if (!bb.hasRemaining()) {
					break;
				}
			}
			failure = false;
		} 

		if (readBytes > 0) {
			bb.flip();

			final ChannelBufferFactory bufferFactory =
				channel.getConfig().getBufferFactory();
			final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
			buffer.setBytes(0, bb);
			buffer.writerIndex(readBytes);

			recvBufferPool.release(bb);

			// Update the predictor.
			predictor.previousReceiveBufferSize(readBytes);

			// Fire the event.
			fireMessageReceived(channel, buffer);
		} 
		return true;
	}
 


发表评论
用户名: 匿名