服务启动
主线
our thread
- 创建selector
- 创建server socket channel
- 初始化 server socket channel
- 给server socket channel 从 boss group中选择一个NioEventLoop
boss thread
- 将server socket channel 注册到选择的NioEventLoop的selector
- 绑定地址启动
- 注册接受连接事件(OP_ACCEPT)到selector上
知识点
-
启动服务的本质
Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();
ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();
selectionKey=javaChannel().register(eventLoop().unwrappedSelector(),0,this);
javaChannel().bind(localAddress,config.getBacklog());
selectionKey.interesetOps(OP_ACCEPT);
-
Selector是在new NioEventLoopGroup()(创建一批NioEventLoop)时创建
-
第一次Register并不是监听OP_ACCEPT,而是0
javaChannel().register(eventLoop().unwrappedSelector(),0,this);
-
最终监听OP_ACCEPT是通过bind完成后的fireChannelActive()来触发的
-
NioEventLoop是通过Register操作的执行来完成启动的
-
类似ChannelInitializer,一些Hander可以设计成一次性的,用完就移除,例如授权。
构建连接
主线
boss thread:
- NioEventLoop中的selector轮询创建连接事件(OP_ACCEPT)
- 创建socket channel
- 初始化socket channel 并从worker group中选择一个NioEventLoop
worker thread
- 将socket channel注册到选择的NioEventLoop的selector
- 注册读事件(OP_READ)到selector上
知识点
-
接受连接的本质:selector.select()/selectNow()/select(timeoutMillis)发现
OP_ACCEPT
事件,处理:- SocketChannel socketChannel = serverSocketChannel.accept();
- selectionKey = javaChannel().registry(eventLoop().unwrappedSelector(),0,this);
- selectionKey.interestOps(OP_READ
-
创建连接的初始化和注册是通过pipeline.fireChannelRead在ServerBootstrapAccptor中完成的。
-
第一次Register并不是监听OP_READ,而是0:
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(),0,this);
-
最终监听OP_READ是通过“Register”完成后的fireChannelActive(io.netty.channel.AbstractChannel.AbstractUnsafe#register0中)来触发的
-
Worker’s NioEventLoop是通过Register操作执行来启动
-
接受连接的读操作,不会尝试读取更多次(16次)
接受数据
读数据技巧
1 自适应数据大小的分配器(AdaptiveRecvByteBufAllocator)
发放东西时,拿多大的通去装?小了不够,大了浪费,所以会自己根据实际装的情况猜下次情况,从而决定带多大的通。
2 连续读(defaultMaxMessagesPerRead)
发放东西时,假设拿的通装满了,这个时候,你会觉得可能还有东西发放,所以直接拿个新桶等着装,而不是回家,直到后面出现美哦与装上的情况,或者装了很多次需要给别人一点机会等原因才停止,回家。
主线
- 多路复用器(Selector)接收到OP_READ事件
- 处理OP_READ事件: NioSocketChannel.NioSocketChannelUnsafe.read() (默认是worker thread处理的)
- 分配一个初始1024字节的byte buffer 来接受数据
- 从channel接收数据到byte buffer
- 记录实际接收数据大小,调整下次分配byte buffer大小
- 触发pipeline.fireChannelRead(byteBuf)把读到的数据传播出去(pipeline上各种handler的执行过程)
- 判断接收byte buffer是否满载而归:是,尝试继续读取直到没有数据或满16次;否:结束本轮读取,等待下次OP_READ事件
知识点
-
读取数据的本质:sun.nio.ch.SocketChannelImpl#read(java.nio.ByteBuffer)
-
NioSocketChannel read()是读数据,NioServerSocketChannel read() 是创建连接
-
pipeline.fireChannelReadComplete();一次读事件处理完成
pipeline.fireChannelRead(byteBuf);一次读数据完成,一次读事件处理可能会包含多次读数据操作
-
为什么最多尝试读取16次?雨露均沾
-
AdapticeRecvByteBufAllocator对bytebuf的猜测:放大果断,缩小谨慎(需要连续两次判断)
业务处理
主线
多路复用器(Selector)接收到OP_READ事件处理OP_READ事件:NioSocketChannel.NioSocketChannelUnsafe.read()(默认是worker thread处理的)分配一个初始1024字节的byte buffer 来接受数据从channel接收数据到byte buffer记录实际接收数据大小,调整下次分配byte buffer大小触发pipeline.fireChannelRead(byteBuf)把读到的数据传播出去
判断接收byte buffer是否满载而归:是,尝试继续读取直到没有数据或满16次;否:结束本轮读取,等待下次OP_READ事件
如果是fireChannelRead方法,经过一串pipeline,每个pipeline都是以head开头tail结尾,handler都在context中,inbound是从head执行到tail,写数据是相反的过程,是tail到head
Handler执行资格:
- 实现了ChannelInboundHandler
- 实现方法channelRead 不能加注解@Skip
知识点
-
处理业务本质:数据在pipeline中所有的handler的channelRead()执行过程
handler要实现io.netty.channel.ChannelInboundHandler#channelRead(ChannelHandlerContext ctx,Object msg),并且不能加注解@Skip才能被执行到
中途可退出,不保证执行到Tail Handler
-
默认处理线程就是Channel绑定的NioEventLoop线程,也可以设置其他:
pipeline.addLast(new UnorderedThreadPoolEventExecutor(10),serverHandler)
发送数据
写数据的三种方式
快递场景(包裹) | Netty写数据(数据) |
---|---|
揽收到仓库 | write:写到一个buffer |
从仓库发货 | flush:把buffer里的数据发送出去 |
揽收到仓库并立马发货(加急件) | writeAndFlush:写到buffer,立马发送 |
揽收与发货之间有个缓冲的仓库 | write和flush之间有个ChannelOutboundBuffer |
写数据要点
1 对方仓库爆仓时,送不了的时候,会停止送,协商等电话通知什么时候好了,再送
Netty写数据,写不进去时,会停止写,然后注册一个OP_WRITE事件,来通知什么时候可以写进去了再写。
2 发送快递时,对方仓库都直接收下,这个时候再发送快递时,可以尝试发送更多的快递试试,这样效果更好。
Netty批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整maxBytesPerGatheringWrite)
3 发送快递时,发送某个地方的快递特别多,我们会连续发,但是快递车毕竟有限,也会考虑下其他地方。
Netty只要有数据要写,且能写的出去,则一直尝试,直到写不出去或者写满16次(writeSpinCount)
4 揽收太多,发送来不及时,爆仓,这个时候会出个告示牌:受不了了,最好过两天再来邮寄
Netty待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成false,让应用端自己做决定要不要发送数据了
主线
-
Write - 写数据到buffer:
ChannelOutBoundBuffer#addMessage
-
Flush - 发送buffer里面的数据
AbstractChannel.AbstractUnsafe#flush
- 准备数据:ChannelOutboundBuffer#addFlush
- 发送:NioSocketChannel#doWrite
知识点
-
写的本质:
- Single write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
- gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[[],int,int])
-
写数据写不进去时,会停止写,注册一个
OP_WRITE
事件,来通知什么时候可以写进去了 -
OP_WRITE不是说有数据可写,而是说可以写进去
,所以正常情况,不能注册,否则一直触发 -
批量写数据时,如果尝试写的都写进去了,接下里会尝试写更多(
maxBytesPerGatheringWrite
) -
只要有数据要写,且能写,则一直尝试,直到16次(
writeSpinCount
),写16次还没有写完,就直接schedule一个task来继续写,而不是用注册写事件来触发,更简洁有力 -
待写数据太多,超过一定的水位线(
writeBufferWateMark.high()
),会将可写的标志位改成false,让应用端子机决定要不要继续写 -
channelHandlerContext.channel().write():从TailContext开始执行;
channelHandlerContext.write():从当前的Context开始。
断开连接
主线
- 多路复用器(Selector)接收到OP_READ事件
- 处理OP_READ事件:NioSocketChannel.NioSocketChannelUnsafe.read()
- 接收数据
- 判断接收的数据大小是否小于0,如果是,说明是关闭,开始执行关闭
- 关闭channel(包含cancel多路复用器的key)
- 清理消息:不接收新信息,fail掉所有queue中消息
- 触发fireChannelInactive和fireChannelUnregistered
知识点
- 关闭连接本质
- java.nio.channels.spi.AbstractInterruptibleChannel#close
- java.nio.channels.SelectionKey#cancel
- java.nio.channels.spi.AbstractInterruptibleChannel#close
- 要点
- 关闭连接,会触发OP_READ方法。读取字节数是-1代表关闭
- 数据读取进行时,强行关闭,触发IOException,进而执行关闭
- Channel的关闭包含了SelectionKey的cancel
关闭服务
主线
-
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
关闭所有Group中的NioEventLoop:
- 修改NioEventLoop的State标志位
- NioEventLoop判断State执行退出
知识点
- 关闭服务的本质:
- 关闭所有连接及Selector
- java.nio.channels.Selector#keys
- java.nio.channels.spi.AbstractInterruptibleChannel#close
- java.nio.channels.SelectionKey#cannel
- Selector.close()
- java.nio.channels.Selector#keys
- 关闭所有线程:退出循环体 for(;😉
- 关闭所有连接及Selector
- 关闭服务要点
- 优雅(DEFAULT_SHUTDONW_QUEIT_PERIOD)
- 可控(DEFAULT_SHUTDONW_TIMEOUT)
- 先不接活,后尽量干完手头的活(先关boss后关worker:不是100%保证)