这个东西应该是netty里面最难理解的,或者最关键的组件了,这个我会慢慢的进行分析。在Pipeline传送完后,都必须都通ChannelSink进行处理。Sink默认处理了琐碎的操作,例如连接、读写等等。
ChannelSink这个组件是来处理downstream请求和产生upstream时间的一个组件,是所有io操作的执行者。也就是传输的逻辑层吧。当channel创建的时候就有一个ChannelSink和它想绑定。
传输层的代码实现一般来说都是比较麻烦的,相比来说客户端的实现一般来说比服务端的实现要简单一些,服务端一般要处理状态变换和数据交换等,我们一点一点来看ChannelSink。
public interface ChannelSink { /** * Invoked by {@link ChannelPipeline} when a downstream {@link ChannelEvent} * has reached its terminal (the head of the pipeline). */ void eventSunk(ChannelPipeline pipeline, ChannelEvent e) throws Exception; /** * Invoked by {@link ChannelPipeline} when an exception was raised while * one of its {@link ChannelHandler}s process a {@link ChannelEvent}. */ void exceptionCaught(ChannelPipeline pipeline, ChannelEvent e, ChannelPipelineException cause) throws Exception; }
接口里面只定义了两个操作,一个是eventSunk,一个是exceptionCaught,eventSunk我不知道怎么翻译,姑且理解为让event处理吧,一般来说应该有一个abstract骨架程序实现,我们就来看吧。
AbstractChannelSink里面只实现了exceptionCaught,来看一眼:
public void exceptionCaught(ChannelPipeline pipeline, ChannelEvent event, ChannelPipelineException cause) throws Exception { Throwable actualCause = cause.getCause(); if (actualCause == null) { actualCause = cause; } fireExceptionCaught(event.getChannel(), actualCause); }
这个实际上是向上层报告一个DefaultExceptionEvent的upstream事件。
下来我们就直奔主题来看NioClientSocketPipelineSink这个类吧。
首先来看局部变量
private static final AtomicInteger nextId = new AtomicInteger(); final int id = nextId.incrementAndGet(); final Executor bossExecutor; private final Boss[] bosses; private final NioWorker[] workers; private final AtomicInteger bossIndex = new AtomicInteger(); private final AtomicInteger workerIndex = new AtomicInteger();
上面是sink的id,这个地方用到了AtomicInteger。还有Boss和NioWorker,boss的个数现在默认是1,NioWorker的个数是处理器的个数的2倍。后面两个是bossIndex和workerIndex。
在构造函数里面初始化了Boss和NioWorker
NioClientSocketPipelineSink( Executor bossExecutor, Executor workerExecutor, int bossCount, int workerCount) { this.bossExecutor = bossExecutor; bosses = new Boss[bossCount]; for (int i = 0; i < bosses.length; i ++) { bosses[i] = new Boss(i + 1); } workers = new NioWorker[workerCount]; for (int i = 0; i < workers.length; i ++) { workers[i] = new NioWorker(id, i + 1, workerExecutor); } }
我们接下来看看eventSunk的实现:
public void eventSunk( ChannelPipeline pipeline, ChannelEvent e) throws Exception { if (e instanceof ChannelStateEvent) { ChannelStateEvent event = (ChannelStateEvent) e; NioClientSocketChannel channel = (NioClientSocketChannel) event.getChannel(); ChannelFuture future = event.getFuture(); ChannelState state = event.getState(); Object value = event.getValue(); switch (state) { case OPEN: if (Boolean.FALSE.equals(value)) { channel.worker.close(channel, future); } break; case BOUND: if (value != null) { bind(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case CONNECTED: if (value != null) { connect(channel, future, (SocketAddress) value); } else { channel.worker.close(channel, future); } break; case INTEREST_OPS: channel.worker.setInterestOps(channel, future, ((Integer) value).intValue()); break; } } else if (e instanceof MessageEvent) { MessageEvent event = (MessageEvent) e; NioSocketChannel channel = (NioSocketChannel) event.getChannel(); boolean offered = channel.writeBuffer.offer(event); assert offered; channel.worker.writeFromUserCode(channel); } }在这个里面如果是Channel的state变化的事件,则执行相应的操作,最终实现都将交给work去操作,关于worker的这些操作,我们以后会详细的介绍。
这个里面如果是bind操作的话会自己调用bind,如下:
private void bind( NioClientSocketChannel channel, ChannelFuture future, SocketAddress localAddress) { try { channel.socket.socket().bind(localAddress); channel.boundManually = true; channel.setBound(); future.setSuccess(); fireChannelBound(channel, channel.getLocalAddress()); } catch (Throwable t) { future.setFailure(t); fireExceptionCaught(channel, t); } }
这个里面,channel、future、localAddress都是Event里面携带的一些参数,其实这个里面实现了socket的bind操作,并触发一些事件。
最后我们看一下connect的实现:
private void connect( final NioClientSocketChannel channel, final ChannelFuture cf, SocketAddress remoteAddress) { try { if (channel.socket.connect(remoteAddress)) { channel.worker.register(channel, cf); } else { channel.getCloseFuture().addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception { if (!cf.isDone()) { cf.setFailure(new ClosedChannelException()); } } }); cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.connectFuture = cf; nextBoss().register(channel); } } catch (Throwable t) { cf.setFailure(t); fireExceptionCaught(channel, t); channel.worker.close(channel, succeededFuture(channel)); } }
这个里面会执行真正的注册动作,如果失败的话会触发一个connectFuture,这条语句 nextBoss().register(channel)不知道是什么意思?
关于客户端channelSink就先看到这个地方,接下来会讲解Netty的线程模型,讲完之后我们再回来看服务端的channelSink的实现。
相关推荐
Netty框架的channel测试程序,包括client 和 server。通过事例程序,可以清楚地看出channel 的执行过程,对于了解Netty框架很有帮助
Netty基础,用于学习Netty,参考黑马程序员的netty教程
Java进阶技术-netty进阶之路
Netty进阶之路,跟着案例学Netty,李林峰大神新作。值得一读。
Netty进阶之路 跟着案例学Netty 整本书无密码,Netty进阶之路 跟着案例学Netty
《Netty进阶之路:跟着案例学Netty》中的案例涵盖了Netty的启动和停止、内存、并发多线程、性能、可靠性、安全等方面,囊括了Netty绝大多数常用的功能及容易让人犯错的地方。在案例的分析过程中,还穿插讲解了Netty...
Netty学习笔记_Springboot实现自定义协议.docx Netty学习笔记_Springboot实现自定义协议.docx Netty学习笔记_Springboot实现自定义协议.docx
netty-3.1官网学习手册,中文版
《Netty进阶之路 跟着案例学Netty》_李林锋_2018-11-01
学习netty源码,为后续rocketmq等学习打下基础
在本书中,作者将在过去几年实践中遇到的问题,以及Netty学习者咨询的相关问题,进行了归纳和总结,以问题案例做牵引,通过对案例进行剖析,讲解问题背后的原理,并结合Netty源码分析,让读者能够真正掌握Netty,在...
NULL 博文链接:https://asialee.iteye.com/blog/1768861
Netty全套学习资源(包括源码、笔记、学习文档等)
Netty实践学习案例
附件为韩顺平老师的netty教程学习资料。老师教学视频可以在B站去找。个人感觉很不错。如果有需要可以去看看。
代码解压后可看到netty3和netty5实现的心跳处理案例,代码比较简单,适合初学者,欢迎大家交流学习,欢迎下载。。。。。
Netty 框架学习 —— 第一个 Netty 应用(csdn)————程序
Netty对Channel总结的思维导图,包括功能梳理,源码分析。
netty学习笔记