Netty(2)应用流程

young 656 2022-05-22

服务启动

主线

our thread

  • 创建selector
  • 创建server socket channel
  • 初始化 server socket channel
  • 给server socket channel 从 boss group中选择一个NioEventLoop

boss thread

  • 将server socket channel 注册到选择的NioEventLoop的selector
  • 绑定地址启动
  • 注册接受连接事件(OP_ACCEPT)到selector上

知识点

  • 启动服务的本质

    Selector selector = sun.nio.ch.SelectorProviderImpl.openSelector();

    ServerSocketChannel serverSocketChannel = provider.openServerSocketChannel();

    selectionKey=javaChannel().register(eventLoop().unwrappedSelector(),0,this);

    javaChannel().bind(localAddress,config.getBacklog());

    selectionKey.interesetOps(OP_ACCEPT);

  • Selector是在new NioEventLoopGroup()(创建一批NioEventLoop)时创建

  • 第一次Register并不是监听OP_ACCEPT,而是0

    javaChannel().register(eventLoop().unwrappedSelector(),0,this);

  • 最终监听OP_ACCEPT是通过bind完成后的fireChannelActive()来触发的

  • NioEventLoop是通过Register操作的执行来完成启动的

  • 类似ChannelInitializer,一些Hander可以设计成一次性的,用完就移除,例如授权。

构建连接

主线

boss thread:

  • NioEventLoop中的selector轮询创建连接事件(OP_ACCEPT)
  • 创建socket channel
  • 初始化socket channel 并从worker group中选择一个NioEventLoop

worker thread

  • 将socket channel注册到选择的NioEventLoop的selector
  • 注册读事件(OP_READ)到selector上

知识点

  • 接受连接的本质:selector.select()/selectNow()/select(timeoutMillis)发现OP_ACCEPT事件,处理:

    • SocketChannel socketChannel = serverSocketChannel.accept();
    • selectionKey = javaChannel().registry(eventLoop().unwrappedSelector(),0,this);
    • selectionKey.interestOps(OP_READ
  • 创建连接的初始化和注册是通过pipeline.fireChannelRead在ServerBootstrapAccptor中完成的。

  • 第一次Register并不是监听OP_READ,而是0:

    selectionKey = javaChannel().register(eventLoop().unwrappedSelector(),0,this);

  • 最终监听OP_READ是通过“Register”完成后的fireChannelActive(io.netty.channel.AbstractChannel.AbstractUnsafe#register0中)来触发的

  • Worker’s NioEventLoop是通过Register操作执行来启动

  • 接受连接的读操作,不会尝试读取更多次(16次)

接受数据

读数据技巧

1 自适应数据大小的分配器(AdaptiveRecvByteBufAllocator)

发放东西时,拿多大的通去装?小了不够,大了浪费,所以会自己根据实际装的情况猜下次情况,从而决定带多大的通。

2 连续读(defaultMaxMessagesPerRead)

发放东西时,假设拿的通装满了,这个时候,你会觉得可能还有东西发放,所以直接拿个新桶等着装,而不是回家,直到后面出现美哦与装上的情况,或者装了很多次需要给别人一点机会等原因才停止,回家。

主线

  • 多路复用器(Selector)接收到OP_READ事件
  • 处理OP_READ事件: NioSocketChannel.NioSocketChannelUnsafe.read() (默认是worker thread处理的)
    • 分配一个初始1024字节的byte buffer 来接受数据
    • 从channel接收数据到byte buffer
    • 记录实际接收数据大小,调整下次分配byte buffer大小
    • 触发pipeline.fireChannelRead(byteBuf)把读到的数据传播出去(pipeline上各种handler的执行过程)
    • 判断接收byte buffer是否满载而归:是,尝试继续读取直到没有数据或满16次;否:结束本轮读取,等待下次OP_READ事件

知识点

  • 读取数据的本质:sun.nio.ch.SocketChannelImpl#read(java.nio.ByteBuffer)

  • NioSocketChannel read()是读数据,NioServerSocketChannel read() 是创建连接

  • pipeline.fireChannelReadComplete();一次读事件处理完成

    pipeline.fireChannelRead(byteBuf);一次读数据完成,一次读事件处理可能会包含多次读数据操作

  • 为什么最多尝试读取16次?雨露均沾

  • AdapticeRecvByteBufAllocator对bytebuf的猜测:放大果断,缩小谨慎(需要连续两次判断)

业务处理

主线

  • 多路复用器(Selector)接收到OP_READ事件
  • 处理OP_READ事件:NioSocketChannel.NioSocketChannelUnsafe.read()(默认是worker thread处理的)
    • 分配一个初始1024字节的byte buffer 来接受数据
    • 从channel接收数据到byte buffer
    • 记录实际接收数据大小,调整下次分配byte buffer大小
    • 触发pipeline.fireChannelRead(byteBuf)把读到的数据传播出去
    • 判断接收byte buffer是否满载而归:是,尝试继续读取直到没有数据或满16次;否:结束本轮读取,等待下次OP_READ事件

Netty业务处理主线

如果是fireChannelRead方法,经过一串pipeline,每个pipeline都是以head开头tail结尾,handler都在context中,inbound是从head执行到tail,写数据是相反的过程,是tail到head

Handler执行资格:

  • 实现了ChannelInboundHandler
  • 实现方法channelRead 不能加注解@Skip

知识点

  • 处理业务本质:数据在pipeline中所有的handler的channelRead()执行过程

    handler要实现io.netty.channel.ChannelInboundHandler#channelRead(ChannelHandlerContext ctx,Object msg),并且不能加注解@Skip才能被执行到

    中途可退出,不保证执行到Tail Handler

  • 默认处理线程就是Channel绑定的NioEventLoop线程,也可以设置其他:

    pipeline.addLast(new UnorderedThreadPoolEventExecutor(10),serverHandler)

发送数据

写数据的三种方式

快递场景(包裹) Netty写数据(数据)
揽收到仓库 write:写到一个buffer
从仓库发货 flush:把buffer里的数据发送出去
揽收到仓库并立马发货(加急件) writeAndFlush:写到buffer,立马发送
揽收与发货之间有个缓冲的仓库 write和flush之间有个ChannelOutboundBuffer

写数据要点

1 对方仓库爆仓时,送不了的时候,会停止送,协商等电话通知什么时候好了,再送

​ Netty写数据,写不进去时,会停止写,然后注册一个OP_WRITE事件,来通知什么时候可以写进去了再写。

2 发送快递时,对方仓库都直接收下,这个时候再发送快递时,可以尝试发送更多的快递试试,这样效果更好。

​ Netty批量写数据时,如果想写的都写进去了,接下来的尝试写更多(调整maxBytesPerGatheringWrite)

3 发送快递时,发送某个地方的快递特别多,我们会连续发,但是快递车毕竟有限,也会考虑下其他地方。

​ Netty只要有数据要写,且能写的出去,则一直尝试,直到写不出去或者写满16次(writeSpinCount)

4 揽收太多,发送来不及时,爆仓,这个时候会出个告示牌:受不了了,最好过两天再来邮寄

​ Netty待写数据太多,超过一定的水位线(writeBufferWaterMark.high()),会将可写的标志位改成false,让应用端自己做决定要不要发送数据了

主线

发送数据主线

  • Write - 写数据到buffer:

    ChannelOutBoundBuffer#addMessage

  • Flush - 发送buffer里面的数据

    AbstractChannel.AbstractUnsafe#flush

    • 准备数据:ChannelOutboundBuffer#addFlush
    • 发送:NioSocketChannel#doWrite

知识点

  • 写的本质:

    • Single write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer)
    • gathering write:sun.nio.ch.SocketChannelImpl#write(java.nio.ByteBuffer[[],int,int])
  • 写数据写不进去时,会停止写,注册一个OP_WRITE事件,来通知什么时候可以写进去了

  • OP_WRITE不是说有数据可写,而是说可以写进去,所以正常情况,不能注册,否则一直触发

  • 批量写数据时,如果尝试写的都写进去了,接下里会尝试写更多(maxBytesPerGatheringWrite

  • 只要有数据要写,且能写,则一直尝试,直到16次(writeSpinCount),写16次还没有写完,就直接schedule一个task来继续写,而不是用注册写事件来触发,更简洁有力

  • 待写数据太多,超过一定的水位线(writeBufferWateMark.high()),会将可写的标志位改成false,让应用端子机决定要不要继续写

  • channelHandlerContext.channel().write():从TailContext开始执行;

    channelHandlerContext.write():从当前的Context开始。

断开连接

主线

  • 多路复用器(Selector)接收到OP_READ事件
  • 处理OP_READ事件:NioSocketChannel.NioSocketChannelUnsafe.read()
    • 接收数据
    • 判断接收的数据大小是否小于0,如果是,说明是关闭,开始执行关闭
      • 关闭channel(包含cancel多路复用器的key)
      • 清理消息:不接收新信息,fail掉所有queue中消息
      • 触发fireChannelInactive和fireChannelUnregistered

知识点

  • 关闭连接本质
    • java.nio.channels.spi.AbstractInterruptibleChannel#close
      • java.nio.channels.SelectionKey#cancel
  • 要点
    • 关闭连接,会触发OP_READ方法。读取字节数是-1代表关闭
    • 数据读取进行时,强行关闭,触发IOException,进而执行关闭
    • Channel的关闭包含了SelectionKey的cancel

关闭服务

主线

关闭服务主线

  • bossGroup.shutdownGracefully();

    workerGroup.shutdownGracefully();

关闭所有Group中的NioEventLoop:

  • 修改NioEventLoop的State标志位
  • NioEventLoop判断State执行退出

知识点

  • 关闭服务的本质:
    • 关闭所有连接及Selector
      • java.nio.channels.Selector#keys
        • java.nio.channels.spi.AbstractInterruptibleChannel#close
        • java.nio.channels.SelectionKey#cannel
      • Selector.close()
    • 关闭所有线程:退出循环体 for(;😉
  • 关闭服务要点
    • 优雅(DEFAULT_SHUTDONW_QUEIT_PERIOD)
    • 可控(DEFAULT_SHUTDONW_TIMEOUT)
    • 先不接活,后尽量干完手头的活(先关boss后关worker:不是100%保证)