Netty(1)基础知识

young 852 2022-05-22

为什么不直接使用JDK NIO

Netty:

  • 支持常用应用层协议
  • 解决传输问题:粘包,半包现象
  • 支持流量整形
  • 完善的断连、Idle等异常处理等

Netty规避了JDK NIO的bug

  • 异常唤醒空转导致CPU 100%

    判断空转的次数,如果大于一定的阈值,就rebuild多路复用器

  • IP_TOS参数(IP包的优先级和QoS选项)使用时抛出异常

    Java.lang.AssertionError:Option not found

Netty的API更友好更强大

  • JDK的NIO一些API不够友好,功能薄弱,如果ByteBuffer->Netty’s ByteBuf
  • 除了NIO外,也提供了其他一些增强:ThreadLocal->Netty’s FastTheadLocal

Netty隔离变换、屏蔽细节

  • 隔离JDK NIO的实现变化:nio->nio2(aio)->…
  • 屏蔽JDK NIO的实现细节

Netty切换三种I/O模式

经典的三种I/O模式

生活场景:

去饭店吃饭

  • 食堂排队打饭模式:排队在窗口,打好才走
  • 点单、等待被叫模式:等待被叫,好了自己去端
  • 包厢模式:点单后菜直接被端上桌

类比

  • 饭店->服务器
  • 饭菜->数据
  • 饭菜好了->数据就绪
  • 端菜/送菜->数据读取
吃饭模式 I/O模式 发布版本
排队打饭模式 BIO(阻塞I/O) JDK1.4之前
点单、等待被叫模式 NIO(非阻塞I/O) JDK1.4(2002年,java.nio包)
包厢模式 AIO(异步I/O) JDK1.7(2011年)
  • 阻塞与非阻塞
    • 菜没好,要不要死等->数据就绪前要不要等待
    • 阻塞:没有数据传过来时,读会阻塞知道有数据;缓冲区满时,写操作也会阻塞。
    • 非阻塞遇到这些情况,都是直接返回
  • 异步与同步
    • 菜好了,谁端->数据就绪后,数据操作谁完成
    • 数据就绪后需要自己去读是同步,数据就绪直接读好再回调给程序是异步

Netty对三种I/O的支持

BIO与AIO曾经支持过

BIO->OIO(Deprecated)
NIO
AIO(Removed)
COMMON
Linux
macOS/BSD
ThreadPerChannelEventLoopGroup
NioEventLoopGroup
EpollEventLoopGroup
KQueueEventLoopGroup
AioEventLoopGroup
ThreadPreChannelEventLoop
NioEventLoop
EpollEventLoop
KQueueEventLoop
AioEventLoop
OioServerSocketChannel
NioServerSocketChannel
EpollServerSocketChannel
KQueueServerSocketChannel
AioServerSocketChannel
OioSocketChannel
NioSocketChannel
EpollSocketChannel
KQueueSocketChannel
AioSocketChannel

为什么Netty仅支持NIO

  • 为什么不建议(Deprecated)阻塞I/O(BIO/OIO)

    连接数高的情况下:阻塞->耗资源、效率低

  • 为什么删除已经做好的AIO支持

    • Windows实现成熟,但是很少用来做服务器
    • Linux常用来做服务器,但是AIO实现不够成熟
    • Linux下AIO相比较NIO的性能提升不明显

为什么Netty有多种NIO实现

通用的NIO实现(Common)在Linux下也是使用epoll,为什么自己单独实现

  • Netty暴露了更多的可控参数,例如
    • JDK的NIO默认实现是水平触发
    • Netty是边缘触发(默认)和水平触发可切换
  • Netty实现的垃圾回收更少,性能更好

NIO一定由于BIO吗

  • BIO代码更简单
  • 特定场景:连接数少,并发度低,BIO性能不输于NIO

Netty对IO模式的支持

代码路径:

Netty源码–>netty-example–>EchoServer

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(serverHandler);
         }
     });

    // Start the server.
    ChannelFuture f = b.bind(PORT).sync();

    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down all event loops to terminate all threads.
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

将代码中的Nio改成Oio就完成了I/O的切换

EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(OioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(serverHandler);
         }
     });

    // Start the server.
    ChannelFuture f = b.bind(PORT).sync();

    // Wait until the server socket is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down all event loops to terminate all threads.
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
}

Netty支持三种Reactor

什么是reactor及三种版本

生活场景:饭店规模变化

  • 一个人包揽所有
  • 多招几个伙计
  • 进一步分工

生活场景类比:

  • 饭店伙计:线程
  • 迎宾工作:接入连接
  • 点菜:请求
    • 一个人包揽所有–>Reactor单线程
    • 多招几个伙计–>Reactor多线程
    • 进一步分工–>主从Reactor多线程
  • 做菜:业务处理
  • 上菜:响应
  • 送客:断连
BIO NIO AIO
Thread-Per-Connection Reactor Proactor

Reactor是一种开发模式,模式的核心流程:

注册感兴趣的事件–>扫描是否有感兴趣的事件发生–>事件发生后做出相应的处理

Client/Server SocketChannel/ServerSockerChannel OP_ACCEPT OP_CONNECT OP_WRITE OP_READ
client SockerChannel Y Y Y
server ServerSocketChannel Y
server ServerChannel Y Y

BIO

一个线程处理读、 解码、运算、编码、响应

read和send都是阻塞操作,阻塞的连接越多,占用的线程越多

Thread-Per-Connection

Reactor单线程模式

所有事情都是一个线程在做

Reactor单线程模式

Reactor多线程模式

decode、compute、decode都交给线程池

Reactor多线程模式

主从Reactor多线程模式

将acceptor事件单独注册到另外一个Reactor中

主从Reactor多线程模式

在netty中使用Reactor模式

Reactor单线程模式 EventLoopGroup eventGroup = new NioEventLoopGroup(1);
ServerBootStrap serverBootStrap = new ServerBootStrap();
serverBootStrap.group(eventGroup);
非主从Reactor多线程 EventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootStrap serverBootStrap = new ServerBootStrap();
serverBootStrap.group(eventGroup);
主从Reactor多线程 EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootStrap serverBootStrap = new ServerBootStrap();
serverBootStrap.group(bossGroup, workerGroup);

不设置的话,会根据cpu核数计算最优的线程数

TCP粘包、半包

什么是粘包和半包

比如客户端发送两条消息ABC和DEF,对方接收到的消息不见得就是这种格式接收到的,可能是一次接收到所有的数据ABCDEF,也可能分多次接收完,比如AB,CD,EF或A,BCDE,F

一次全接收完就是粘包现象,多次接收完是半包现象

粘包的主要原因:

  • 发送方每次写入数据<套接字缓冲区大小
  • 接收方读取套接字缓存数据不够及时

半包的主要原因:

  • 发送方写入数据>套接字缓冲区大小
  • 发送的数据大于协议的MTU(Maximun Transmission Unit,最大传输单元),必须拆包

tcp/ip 协议是分层的,每层包的大小都有控制,这个控制就叫MTU

Media for IP transport Maximun transmission unit(bytes)
Internet IPv4 path MTU At least 68,max of 64 KiB
Internet IPv6 path MTU At least 1280,max of 64 KiB,but up to 4 GiB with optional jumbogram
Ethernet v2 1500

只要数据比较大的时候就会进行拆包

换个角度来看

  • 收发

    一个发送可能被多次接收,多个发送可能被一次接收

  • 传输

    一个发送可能占用多个传输包,多个发送可能公用一个传输包

根本原因:TCP是流式协议,消息无边界

提醒:UDP像邮寄的包裹,虽然一次运输多个,但是每个包裹都有”界限“,一个一个签收,所以无粘包、半包现象

解决方式

解决问题的根本手段:找出消息的边界

方式\比较 寻找消息边界方式 优点 缺点 推荐度
TCP连接改成短连接,一个请求一个短连接 建立连接到释放连接之间的信息即为传输信息 简单 效率低下 不推荐
封装成帧(Framing) 固定长度(|ABC|DEF|GHI) 满足固定长度即可 简单 空间浪费 不推荐
分隔符(|ABC\nDEF\n|) 分隔符之间 空间不浪费,也比较简单 内容本身出现分隔符时需要转义,所以需要扫描内容 推荐
固定长度字段存个内容的长度信息(|0x0005|"ABC"|) 先解析固定长度的字段获取长度,然后读取后续内容 精确定位用户数据,内容也不用转义 长度理论上有限制,需提前预知可能的最大长度,从而定义长度占用字节数 推荐
其他方式 每种都不同,例如JSON可以看{}是否应已经成对 衡量实际场景,很多是对现有协议的支持

Netty对三种常用封帧方式的支持

方式\支持 解码 编码
封装成帧(Framing) 固定长度 FixedLengthFrameDecode 简单
分隔符 DelimiterBasedFrameDecoder
固定长度字段存内容的长度信息 LengthDieldBasedFrameDecoder LengthFieldPrepender

ByteToMessageDecoder

解码入口为channelRead方法

@Override
// msg是数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        selfFiredChannelRead = true;
        CodecOutputList out = CodecOutputList.newInstance();
        try {
          // cumulation 数据积累器,解码之前或者解码之后,他要做数据积累
            first = cumulation == null;
          // 追加到数据积累起
            cumulation = cumulator.cumulate(ctx.alloc(),
                    first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
          // 调用decode
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes, so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                firedChannelRead |= out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
            } finally {
                out.recycle();
            }
        }
    } else {
        ctx.fireChannelRead(msg);
    }
}
// in是数据积累器的数据
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        while (in.isReadable()) {
            final int outSize = out.size();

            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                if (ctx.isRemoved()) {
                    break;
                }
            }

            int oldInputLength = in.readableBytes();
          // decode中时,不能执行handler remove清理操作
          // decode完之后,要清理数据
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }

            if (out.isEmpty()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        decode(ctx, in, out);
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            fireChannelRead(ctx, out, out.size());
            out.clear();
            handlerRemoved(ctx);
        }
    }
}
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

FixedLengthFrameDecoder

@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
    Object decoded = decode(ctx, in);
    if (decoded != null) {
        out.add(decoded);
    }
}
protected Object decode(
        @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
  // 收集到的数据是否小于定义的长度
    if (in.readableBytes() < frameLength) {
        return null;
    } else {
      // 根据定义的长度解析数据,多出来的数据仍然在数据积累器中,所以不用担心粘包的问题
        return in.readRetainedSlice(frameLength);
    }

数据积累器

/**
 * Cumulate {@link ByteBuf}s.
 */
public interface Cumulator {
    /**
     * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
     * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
     * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
     */
    ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
/**
 * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
 */
// 采用内存复制的方式
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if (!cumulation.isReadable() && in.isContiguous()) {
            // If cumulation is empty and input buffer is contiguous, use it directly
            cumulation.release();
            return in;
        }
        try {
            final int required = in.readableBytes();
          // 如果不够空间的话,进行扩容
            if (required > cumulation.maxWritableBytes() ||
                required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
                cumulation.isReadOnly()) {
                // Expand cumulation (by replacing it) under the following conditions:
                // - cumulation cannot be resized to accommodate the additional data
                // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                return expandCumulation(alloc, cumulation, in);
            }
          // 追加数据
            cumulation.writeBytes(in, in.readerIndex(), required);
            in.readerIndex(in.writerIndex());
            return cumulation;
        } finally {
            // We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
            // for whatever release (for example because of OutOfMemoryError)
            in.release();
        }
    }
};
/**
 * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
 * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
 * and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
 */
// 组合模式
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
    @Override
    public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
        if (!cumulation.isReadable()) {
            cumulation.release();
            return in;
        }
        CompositeByteBuf composite = null;
        try {
            if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
                composite = (CompositeByteBuf) cumulation;
                // Writer index must equal capacity if we are going to "write"
                // new components to the end
                if (composite.writerIndex() != composite.capacity()) {
                    composite.capacity(composite.writerIndex());
                }
            } else {
                composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
            }
          // 避免内存复制
            composite.addFlattenedComponents(true, in);
            in = null;
            return composite;
        } finally {
            if (in != null) {
                // We must release if the ownership was not transferred as otherwise it may produce a leak
                in.release();
                // Also release any new buffer allocated if we're not returning it
                if (composite != null && composite != cumulation) {
                    composite.release();
                }
            }
        }
    }
};

核心参数

DelimiterBasedFrameDecoder

public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {

    private final ByteBuf[] delimiters; // 可以支持多个分隔符
    private final int maxFrameLength;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private int tooLongFrameLength;
    /** Set only when decoding with "\n" and "\r\n" as the delimiter.  */
    private final LineBasedFrameDecoder lineBasedDecoder;
  
  	...
}

FixedLengthFrameDecoder

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
		// 固定长度
    private final int frameLength;

    /**
     * Creates a new instance.
     *
     * @param frameLength the length of the frame
     */
  	public FixedLengthFrameDecoder(int frameLength) {
        checkPositive(frameLength, "frameLength");
        this.frameLength = frameLength;
    }
  
  	...
}

LengthFieldBasedFrameDecoder

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
  ...
}

文档中可以看到4个核心的控制参数

  • lengthFieldOffset:length字段的偏移量

  • lengthFieldLength:length所占用的长度

  • lengthAdjustment:长度修正,长度字段与消息体字段直接的距离

  • initialBytesToStrip:跳过一个包中前面多少个字节不处理,通常是将协议头部跳过,只将消息体中内容传输到下游时使用

      lengthFieldOffset   = 0
      lengthFieldLength   = 2 // length字段站2字节长度
      lengthAdjustment    = 0
      initialBytesToStrip = 0
    * +--------+----------------+      +--------+----------------+
    * | Length | Actual Content |----->| Length | Actual Content |
    * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
    * +--------+----------------+      +--------+----------------+
    
    
    	lengthFieldOffset   = 2 // length字段偏移2字节
      lengthFieldLength   = 3 // length字段长度为3字节
      lengthAdjustment    = 0
      initialBytesToStrip = 0
    * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
     * +----------+----------+----------------+      +----------+----------+----------------+
     * | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
     * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
     * +----------+----------+----------------+      +----------+----------+----------------+
     
     
      lengthFieldOffset   = 0 // length字段偏移量为0
      lengthFieldLength   = 3 // length字段长度为3字节
      lengthAdjustment    = 2 // length字段距离数据2字节
      initialBytesToStrip = 0
      Header 1的长度为2 
     * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
     * +----------+----------+----------------+      +----------+----------+----------------+
     * |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
     * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
     * +----------+----------+----------------+      +----------+----------+----------------+
     
      lengthFieldOffset   = 1 // length字段偏移量为1
      lengthFieldLength   = 2 // length字段长度2字节
      lengthAdjustment    = 1 // length字段距离数据1字节
      initialBytesToStrip = 3 // 剥离3字节的前置头
     * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
     * +------+--------+------+----------------+      +------+----------------+
     * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
     * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
     * +------+--------+------+----------------+      +------+----------------+
     
      length字段表示整个包的长度
      lengthFieldOffset   = 1 // length字段偏移量为1
      lengthFieldLength   = 2 // length字段长度2字节
      lengthAdjustment    = -3 // 当length为整个包的长度时,将lengthAdjustment设置为负数,这里3为HD1+Length的长度
      initialBytesToStrip = 3 // 剥离3字节的前置头
     
     * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
     * +------+--------+------+----------------+      +------+----------------+
     * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
     * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
     * +------+--------+------+----------------+      +------+----------------+
    

二次编解码

为什么要二次编解码

假设把解决半包粘包文件的常用三种解码器叫做一次解码器

方式\支持 解码 编码
封装成帧(Framing) 固定长度 FixedLengthFrameDecode 不内置,几乎没人人
分隔符 DelimiterBasedFrameDecoder 不内置,太简单
固定长度字段存内容的长度信息 LengthDieldBasedFrameDecoder LengthFieldPrepender

那么我们在项目中,除了可选的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,如果要和项目中所使用的对象做转换,方便使用,这层解码器可以称为二次解码器,相应的,对应的编码器是为了将Java对象转化成字节流方便存储或传输

  • 一次解码器:ByteToMessageDecoder
    • io.netty.buffer.ByteBuf(原始数据流)–>io.netty.buffer.ByteBuf(用户数据)
  • 二次解码器:MessageToMessageDecode<I>
    • io.netty.buffer.ByteBuf(用户数据)–> Java Object

是否可以一步到位,合并1次解密(解决粘包半包)和2次解码(解决可操作问题)

可以,但是不建议

  • 没有分层,不够清晰
  • 耦合高,不容易置换方案

常用的二次编解码方式

  • Java序列化
  • Marshaling
  • XML
  • JSON
  • MEssagePack
  • Protobuf
  • 其他

选择编解码方式的要点

  • 空间:编码后占用空间
  • 时间:编解码速度
  • 是否追求可读性
  • 多语言的支持

Google Protobuf简介

  • Protobuf是一个灵活的、高效的用于序列化数据的协议
  • 相比较XML和JSON格式,Protobuf更小,更快,更便捷
  • Protobuf是跨语言的,并且自带了一个编译器protoc,只需要用它进行编译,可以自动生成Java、python、C++等代码,不需要再写其他代码

Netty多常用编解码方式

netty-codec包

keepalive与Idle

Keepalive

为什么需要keepalive

生活场景

假设你开了一个饭店,别人打电话来订餐,电话通了之后,订餐的说了一堆订餐的需求,说着说着,对象就不讲话了(可能忘记挂机/出去办事/线路故障等)

这是时候你会一直握着电话等待吗?不会,一会你会确认一句,”你还在吗“,如果对方没有回复,就挂机。这个机制就是keepalive

类比服务器应用

订餐电话场景 服务器应用
电话线路 数据连接(TCP连接)
交谈的话语 数据
通话双方 数据发送和接收方
对比\场景 场景 应用
需要keepalive的场景 对方临时走开 对端异常”崩溃“
对方在,但是很忙,不知道什么时候能忙完 对端在,但是处理不过来
电话线路故障 对端在,但是不可达
不做keepalive的后果 线路占用,耽误其他人订餐 连接已坏,但是还浪费资源,下次直接使用会直接报错

如何设计keepalive

以TCP keepalive为例

# TCP keepalive核心参数 
sysctl -a|grep tcp_keepavlie

# 7200秒没有数据传输才做检测
net.ipv4.tcp_keepalive_time = 7200
# 下一个探测包的间隔时间
net.ipv4.tcp_keepalive_intvl = 75
# 探测包的发送次数
net.ipv4.tcp_keepalive_probes = 9

当启用(默认关闭)keepalive时,TCP在连接没有数据通过的7200秒之后发送keepalive消息,当探测没有回复时按75秒重试频率发送,一直发9个探测包都没有勘测到回复时连接失败。

所以总耗时为2小时11分钟(7200秒 + 75秒*9)

为什么需要应用层的keepalive

  • 协议分层,各层关注点不同

    传输层关注是否”通”,应用层关注是否可服务。类比之前的订餐例子,电话能通,不代表有人接;服务器链接子,但是不一定可以服务(例如服务不过来)

  • TCP层的keepalive默认关闭,且警告路由等中转设备keepalive包可能被丢弃

  • TCP层的keepalive时间太长

    默认大于2小时,虽然可以修改,但是属于系统参数,改动会影响所有应用

注意:HTTP属于应用层协议,但是常常听到名词“HTTP Keep-Alive”指的是对长连接和短连接的选择

  • Connection:Keep-Alive 长连接(HTTP/1.1默认长连接,不需要添加这个header)
  • Connection:Close 短连接

Idle监测

什么是Idle监测

重现场景

假设你开了一个饭店,别人打电话来订餐,电话通了之后,订餐的说了一堆订餐的需求,说着说着,对象就不讲话了。

你会立马发问:你还在吗

不会,一般你会稍等待一定的时间,在这个时间内看看对方还不不会说话(Idle检测),如果还不说话,认定潍坊存在问题(Idle),于是开始发问“你还在吗”(keepavlie),或者问都不问干脆直接挂机(关闭连接)

Idle监测,只是负责诊断,诊断后,做出不同的行为,决定Idle监测的最终用途:

  • 发送keepalive:一般用来配合keepalive,减少keepalive消息

    keepalive设计演进:V1:定时keepalive消息->V2:空闲检测+判断为Idle时才发keepalive

    • V1:keepalive消息与服务器正常消息交换完全不关联,定时就发送
    • V2:有其他数据传输时,不发送keepalive,无数据传输超过一定时间,判定为Idle,再发送keepalive
  • 直接关闭连接

    • 跨苏释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态
    • 简单粗暴,客户端可能需要重连

实际应用中,结合起来使用。按需keepalive,保证不会空闲,如果空闲,关闭连接。

在Netty中开启TCP keepalive和Idle检测

开启keepalive:

Server端开启TCP keepalive

.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(NioChannelOption.SO_KEEPALIVE,true)

提示:.option(ChannelOption.SO_KEEPALIVE,true)存在但是无效

开启不同的Idle Check

ch.pipeline().addList("idleCheckHandler",new IdelStateHandler(0,20,0,TimeUnit.SECONDS));

io.netty.handler.timeout.IdleStateHandler#IdleStateHandler(long, long, long, java.util.concurrent.TimeUnit)

/**
 * @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
 *
public IdleStateHandler(
        long readerIdleTime, long writerIdleTime, long allIdleTime,
        TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

参数:
observeOutput - 在评估写入空闲时是否应考虑bytes的消耗。默认值为false 。

readerIdleTime – 状态为IdleStateEvent的IdleState.READER_IDLE将在指定时间段内未执行读取时触发。指定0以禁用。

writerIdleTime – 状态为IdleStateEvent的IdleState.WRITER_IDLE将在指定时间段内未执行写入时触发。指定0以禁用。

allIdleTime – 状态为IdleStateEvent的IdleState.ALL_IDLE将在指定时间段内未执行读取或写入时触发。指定0以禁用。

unit – readerIdleTime 、 writeIdleTime和allIdleTime的TimeUnit

两种开启keepalive方式的区别

.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(NioChannelOption.SO_KEEPALIVE,true)

这里的child是跟客户端做连接的SocketChannel

io.netty.bootstrap.ServerBootstrap#childOption

/**
 * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
 * (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
 * {@link ChannelOption}.
 */
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
    ObjectUtil.checkNotNull(childOption, "childOption");
    synchronized (childOptions) {
        if (value == null) {
            childOptions.remove(childOption);
        } else {
            childOptions.put(childOption, value);
        }
    }
    return this;
}

io.netty.bootstrap.ServerBootstrap#init

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
  // childOptions转换成了currentChildOptions
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                  // ServerBootstrapAcceptor是接受连接后的后续处理
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);
		// 将keepalive设置到channel
    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

io.netty.bootstrap.AbstractBootstrap#setChannelOption

private static void setChannelOption(
        Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
    try {
        if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
            logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
        }
    } catch (Throwable t) {
        logger.warn(
                "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
    }
}

io.netty.channel.ChannelConfig#setOption

<T> boolean setOption(ChannelOption<T> option, T value);

他有很多实现,上面提到child是一个SocketChannel,所以选择NioSocketChannel的实现

io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelConfig#setOption

@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
  	// jdk版本大于=7,并且是NioChannelOption的时候
    if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
        return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
    }
  // 普通的channelOption
    return super.setOption(option, value);
}

io.netty.channel.socket.nio.NioChannelOption#setOption

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {
    java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;
    if (!channel.supportedOptions().contains(option.option)) {
        return false;
    }
  	// 解决NIO的bug
    if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {
        // Skip IP_TOS as a workaround for a JDK bug:
        // See https://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.html
        return false;
    }
    try {
      // JDK调用
        channel.setOption(option.option, value);
        return true;
    } catch (IOException e) {
        throw new ChannelException(e);
    }
}

io.netty.channel.socket.DefaultSocketChannelConfig#setOption

// 普通的channelOption
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
    validate(option, value);

    if (option == SO_RCVBUF) {
        setReceiveBufferSize((Integer) value);
    } else if (option == SO_SNDBUF) {
        setSendBufferSize((Integer) value);
    } else if (option == TCP_NODELAY) {
        setTcpNoDelay((Boolean) value);
    } else if (option == SO_KEEPALIVE) {
        setKeepAlive((Boolean) value);
    } else if (option == SO_REUSEADDR) {
        setReuseAddress((Boolean) value);
    } else if (option == SO_LINGER) {
        setSoLinger((Integer) value);
    } else if (option == IP_TOS) {
        setTrafficClass((Integer) value);
    } else if (option == ALLOW_HALF_CLOSURE) {
        setAllowHalfClosure((Boolean) value);
    } else {
        return super.setOption(option, value);
    }

    return true;
}

Idle

源码在netty-handler下面,属于netty的扩展,在handler中的timeout包中

**
 * An {@link Enum} that represents the idle state of a {@link Channel}.
 */
public enum IdleState {
    /**
     * No data was received for a while.
     */
  // 读Idle
    READER_IDLE,
    /**
     * No data was sent for a while.
     */
  // 写Idle
    WRITER_IDLE,
    /**
     * No data was either received or sent for a while.
     */
  // 没有数据传和数据收
    ALL_IDLE
}
public class IdleStateEvent {
    public static final IdleStateEvent FIRST_READER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.READER_IDLE, true);
    public static final IdleStateEvent READER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.READER_IDLE, false);
    public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.WRITER_IDLE, true);
    public static final IdleStateEvent WRITER_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.WRITER_IDLE, false);
    public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.ALL_IDLE, true);
    public static final IdleStateEvent ALL_IDLE_STATE_EVENT =
            new DefaultIdleStateEvent(IdleState.ALL_IDLE, false);
  
  ...
}

这里将IdleState的三种类型细分成了6个,增加了是否第一次的属性,因为idle可能会多次触发,如果是首次的话,可能会进行一些特殊的处理

io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask

// 读Idle超时任务
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
          // 计算是否Idle
            nextDelay -= ticksInNanos() - lastReadTime;
        }

        if (nextDelay <= 0) {
          // 空闲了
            // Reader is idle - set a new timeout and notify the callback.
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstReaderIdleEvent;
          	// firstReaderIdleEvent下个读来之前,第一次idle之后,可能触发多次
            firstReaderIdleEvent = false;

            try {
              // 创建一个IdleStateEvent
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
          // 没有发生空闲,重新起一个监测task,用nextdelay时间
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

io.netty.handler.timeout.IdleStateHandler.WriterIdleTimeoutTask

private final class WriterIdleTimeoutTask extends AbstractIdleTask {

    WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {

        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        if (nextDelay <= 0) {
            // Writer is idle - set a new timeout and notify the callback.
            writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstWriterIdleEvent;
            firstWriterIdleEvent = false;

            try {
                if (hasOutputChanged(ctx, first)) {
                    return;
                }

                IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Write occurred before the timeout - set a new timeout with shorter delay.
            writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

写Idle超时任务和读Idle超时任务差不多,它多了一个hasOutputChanged判断

/**
 * Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
 * with {@link #observeOutput} enabled and there has been an observed change in the
 * {@link ChannelOutboundBuffer} between two consecutive calls of this method.
 *
 * https://github.com/netty/netty/issues/6150
 */
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {
				// 正常情况下,false,即写空闲的判断中的写是指写成功,但是实际上,可能遇到集中情况
        // (1)写了,但是缓存区满了,写不出去;(2)写了一个大数据,写确实在动,但是没有完成。
        // 所以这个参数,判断是否有写的意图,而不是判断是否写成功
        // We can take this shortcut if the ChannelPromises that got passed into write()
        // appear to complete. It indicates "change" on message level and we simply assume
        // that there's change happening on byte level. If the user doesn't observe channel
        // writability events then they'll eventually OOME and there's clearly a different
        // problem and idleness is least of their concerns.
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;

            // But this applies only if it's the non-first call.
            if (!first) {
                return true;
            }
        }

        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();

        if (buf != null) {
            int messageHashCode = System.identityHashCode(buf.current());
            long pendingWriteBytes = buf.totalPendingWriteBytes();
						// pendingWriteBytes != lastPendingWriteBytes 在发送的字节数不等于上一次发送的字节数
            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;

                if (!first) {
                    return true;
                }
            }

            long flushProgress = buf.currentProgress();
          	// 写的进度,不一定完全写完了,但一直在动
            if (flushProgress != lastFlushProgress) {
                lastFlushProgress = flushProgress;
                return !first;
            }
        }
    }

    return false;
}

Netty实现了几个发生Idle时候的处理,如果发生了ReadIdle,我们要抛出异常的话,可以用ReadTimeoutHandler

@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    assert evt.state() == IdleState.READER_IDLE;
    readTimedOut(ctx);
}

/**
 * Is called when a read timeout was detected.
 */
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
    if (!closed) {
        ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
        ctx.close();
        closed = true;
    }
}

WriteTimeoutHandler

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    if (timeoutNanos > 0) {
        promise = promise.unvoid();
      	// 写的时候schedule一个task去检查是否完成了
        scheduleTimeout(ctx, promise);
    }
    ctx.write(msg, promise);
}
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
    // Schedule a timeout.
    final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
    task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);

    if (!task.scheduledFuture.isDone()) {
        addWriteTimeoutTask(task);

        // Cancel the scheduled timeout if the flush promise is complete.
        promise.addListener(task);
    }
}

io.netty.handler.timeout.WriteTimeoutHandler.WriteTimeoutTask#run

@Override
public void run() {
    // Was not written yet so issue a write timeout
    // The promise itself will be failed with a ClosedChannelException once the close() was issued
    // See https://github.com/netty/netty/issues/2159
  
  	// 判断write本身是否完成
    if (!promise.isDone()) {
        try {
          // 触发写超时异常
            writeTimedOut(ctx);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    }
    removeWriteTimeoutTask(this);
}

ReadTimeoutHandler是判断read是否空闲的,而WriteTimeoutHandler是判断写是否完成的

Netty的锁

同步问题的三要素

  • 原子性
  • 可见性
  • 有序性

锁的分类

  • 对竞争的态度:乐观锁(JUC包中的原子类)与悲观锁(Synchronized)
  • 等待锁的人是否公平而言:公平锁 new ReentrantLock(true) 与非公平锁new ReentrantLock()
  • 是否可以共享:共享锁与独占锁ReadWriteLock,读锁是共享锁,写锁是独占锁

Netty锁的几个关键点

  • 在意锁的对象和范围->减少粒度

    Synchronized method -> Synchronized block

  • 注意锁的对象本身大小->减少空间占用

    例:统计待发送的字节数(io.netty.channel.ChannelOutboundBuffer)

    AtomicLong -> Volatile long + AtomicLongFieldUpdater

    private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
            AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
    
    @SuppressWarnings("UnusedDeclaration")
    private volatile long totalPendingSize;
    
    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
    
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
    

    Atomic long VS long

    前者是一个对象,包含对象头(object header)以用来保存hashcode、lock等信息,32为系统占用8字节,64位系统占16字节,所以在64位系统情况下:

    • volatile long = 8 bytes
    • AtomicLong = 8 bytes (volatile long) + 16 bytes (对象头) + 8 bytes (引用) =32 bytes

    至少节约24字节

    Atomic* Object ->Volatile primary type + Static Atomic*FieldUpdater

  • 注意锁的速度->提高并发性

    例:记录内存分配字节数等功能用到的LongCounter

    io.netty.util.internal.PlatformDependent#newLongCounter

    高并发时:LongAdder的性能优与原子类的性能

    /**
     * Creates a new fastest {@link LongCounter} implementation for the current platform.
     */
    public static LongCounter newLongCounter() {
        if (javaVersion() >= 8) {
            return new LongAdderCounter();
        } else {
            return new AtomicLongCounter();
        }
    }
    
    @SuppressJava6Requirement(reason = "Usage guarded by java version check")
    final class LongAdderCounter extends LongAdder implements LongCounter {
    
        @Override
        public long value() {
            return longValue();
        }
    }
    
  • 不同场景选择不同的并发包->因需而变

    例:关闭和等待事件执行器(Event Executor)

    Object.wait/notify->CountDownLatch

    io.netty.util.concurrent.SingleThreadEventExecutor#threadLock

    例:Nio Event Loop 中负责存储task的Queue

    Jdk’s LinkedBlockingQueue(MPMC)->jctools MPSC

    MPMC:多生产者多消费者

    MPSC:多生成者单消费者

    NioEventLoop只绑定了一个线程,也就是消费者只有一个

    io.netty.util.internal.PlatformDependent.Mpsc#newChunkedMpscQueue

    static <T> Queue<T> newChunkedMpscQueue(final int chunkSize, final int capacity) {
        return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue<T>(chunkSize, capacity)
                : new MpscChunkedAtomicArrayQueue<T>(chunkSize, capacity);
    }
    
  • 衡量好锁的价值->能不用则不用

    生活场景:

    饭店提供了很多包厢,服务模式:

    • 一个服务员固定服务某几个包厢模式
    • 所有的服务员服务所有包厢的模式

    表面上看,前者效率没有后者高,单实际上它避免了服务员之间的沟通(上下文切换)等开销,避免客人和服务员之间导出乱窜,管理简单。

    局部串行:Channel的I/O请求处理Pipeline是串行的

    整体并行:多个串行化的线程(NioEventLoop)
    NioEventLoop整体并行

    Netty应用场景下:局部串行+整体并行 >一个队列+多个线程模式:

    • 降低用户开发难度、逻辑简单、提升处理性能
    • 避免锁带来的上下文切换和并发保护等额外开销

    避免用锁:用ThreadLocal来避免资源争用,例如Netty轻量级的线程池实现

    io.netty.util.Recycler#threadLocal

    private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() {
        @Override
        protected LocalPool<T> initialValue() {
            return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize);
        }
    
        @Override
        protected void onRemoval(LocalPool<T> value) throws Exception {
            super.onRemoval(value);
            MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles;
            value.pooledHandles = null;
            handles.clear();
        }
    };
    

Netty的内存使用

内存使用技巧的目标

  • 内存占用少(空间)
  • 应用速度快(时间)

对Java而言,减少Full GC的STW(stop the world)时间

减少对象本身的大小

能用基本类型就不要用包装类型

应该定义成类变量的不要定义为实例变量

结合前两者 AtomicLong->volatile long + static AtomicLongFieldUpdater

对分配内存进行预估

对于已经可以预知固定size的HashMap避免扩容

可以提前计算好初始size或者直接使用

com.google.common.collect.Maps#newHashMapWithExpectedSize

/**
 * Creates a {@code HashMap} instance, with a high enough "initial capacity" that it <i>should</i>
 * hold {@code expectedSize} elements without growth. This behavior cannot be broadly guaranteed,
 * but it is observed to be true for OpenJDK 1.7. It also can't be guaranteed that the method
 * isn't inadvertently <i>oversizing</i> the returned map.
 *
 * @param expectedSize the number of entries you expect to add to the returned map
 * @return a new, empty {@code HashMap} with enough capacity to hold {@code expectedSize} entries
 *     without resizing
 * @throws IllegalArgumentException if {@code expectedSize} is negative
 */
public static <K, V> HashMap<K, V> newHashMapWithExpectedSize(int expectedSize) {
  return new HashMap<>(capacity(expectedSize));
}

/**
 * Returns a capacity that is sufficient to keep the map from being resized as long as it grows no
 * larger than expectedSize and the load factor is ≥ its default (0.75).
 */
static int capacity(int expectedSize) {
  if (expectedSize < 3) {
    checkNonnegative(expectedSize, "expectedSize");
    return expectedSize + 1;
  }
  if (expectedSize < Ints.MAX_POWER_OF_TWO) {
    // This is the calculation used in JDK8 to resize when a putAll
    // happens; it seems to be the most conservative calculation we
    // can make.  0.75 is the default load factor.
    return (int) ((float) expectedSize / 0.75F + 1.0F);
  }
  return Integer.MAX_VALUE; // any large value
}

Netty根据接受到的数据动态调整(guess)下个要分配的Buffer的大小,可参考

io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#record

// 接受数据buffer的容量会尽可能的足够大以接受数据,也尽可能足够小避免浪费空间
private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
        if (decreaseNow) {
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

零拷贝 Zero-Copy

1.使用逻辑组合,代替实际复制

例如CompositeByteBuf

io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR

2.使用包装,代替实际复制

byte[] bytes = data.getBytes();
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);

3.调用JDK的Zero-Copy接口

netty中也通过DefaultFileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝

@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
    long count = this.count - position;
    if (count < 0 || position < 0) {
        throw new IllegalArgumentException(
                "position out of range: " + position +
                " (expected: 0 - " + (this.count - 1) + ')');
    }
    if (count == 0) {
        return 0L;
    }
    if (refCnt() == 0) {
        throw new IllegalReferenceCountException(0);
    }
    // Call open to make sure fc is initialized. This is a no-oop if we called it before.
    open();

    long written = file.transferTo(this.position + position, count, target);
    if (written > 0) {
        transferred += written;
    } else if (written == 0) {
        // If the amount of written data is 0 we need to check if the requested count is bigger then the
        // actual file itself as it may have been truncated on disk.
        //
        // See https://github.com/netty/netty/issues/8868
        validate(this, position);
    }
    return written;
}

堆外内存

堆外内存生活场景:

夏天小区周边的烧烤店,人满为患坐不下,通常店家会在门口摆很多桌子招待客人。

店内->JVM内部->堆(heap)+非堆(non heap)

店外->JVM外部->堆外(off heap)

优点:

  • 更广阔的“空间”,缓解店铺内压力->破除堆空间限制,减轻GC压力
  • 减少“冗余”细节(介绍烧烤过程为了气氛在室外进行:考好直接上桌VS考好还要进店内)->避免复制

缺点:

  • 需要搬桌子->创建速度稍慢
  • 受城管管、风险大->堆外内存受操作系统管理

内存池

生活场景:

点菜单的演进:

  • 一张纸:一桌客人一张纸
  • 点菜平板:循环使用

为什么引入对象池:

  • 创建对象开销大
  • 对象高频率创建且可复用
  • 支持并发又能保护系统
  • 维护、共享有限的资源

如何实现对象池

  • 开源实现:Apache Commons Pool
  • Netty轻量级对象池实现 io.netty.util.Recycler

Netty对堆外内存和内存池的支持

内存池/非内存池的默认选择及切换方式

// 切换到unpooled的方式之一
bootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
// 切换到pooled的方式  
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

netty的默认选择

io.netty.channel.DefaultChannelConfig#allocator

// 默认bytebuf分配器
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;

io.netty.buffer.ByteBufUtil

static {
  // 以io.netty.allocator.type为准,没有的话,安卓平台使用非池化技术,其他使用池化技术
  // 也就是可以使用io.netty.allocator.type来指定是否使用池化技术
    String allocType = SystemPropertyUtil.get(
            "io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
    allocType = allocType.toLowerCase(Locale.US).trim();

    ByteBufAllocator alloc;
    if ("unpooled".equals(allocType)) {
        alloc = UnpooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else if ("pooled".equals(allocType)) {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: {}", allocType);
    } else {
        alloc = PooledByteBufAllocator.DEFAULT;
        logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
    }

    DEFAULT_ALLOCATOR = alloc;

    THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
    logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);

    MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
    logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
}

获取池中对象

io.netty.buffer.PooledDirectByteBuf#newInstance

// 从“池”里借一个用
// RECYCLER-->轻量级线程池实现
static PooledDirectByteBuf newInstance(int maxCapacity) {
    PooledDirectByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}

io.netty.util.Recycler#get

public final T get() {
    if (maxCapacityPerThread == 0) {
      // 表明没有开启池化
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    LocalPool<T> localPool = threadLocal.get();
    DefaultHandle<T> handle = localPool.claim();
    T obj;
  	// 试图从“池”中取出一个,没有就新建一个
    if (handle == null) {
        handle = localPool.newHandle();
        if (handle != null) {
            obj = newObject(handle);
            handle.set(obj);
        } else {
            obj = newObject((Handle<T>) NOOP_HANDLE);
        }
    } else {
        obj = handle.get();
    }

    return obj;
}

归还对象

io.netty.buffer.PooledByteBuf#deallocate

// 归还对象到“池”里,pipeline的tail会调用
@Override
protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        this.handle = -1;
        memory = null;
        chunk.decrementPinnedMemory(maxLength);
        chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
        tmpNioBuf = null;
        chunk = null;
        recycle();
    }
}
@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }
  // 释放用完的对象到池里面去
    localPool.release(this);
}

堆内内存/堆外内存

io.netty.buffer.PooledByteBufAllocator#DEFAULT

public static final PooledByteBufAllocator DEFAULT =
        new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());

io.netty.util.internal.PlatformDependent#directBufferPreferred

public static boolean directBufferPreferred() {
    return DIRECT_BUFFER_PREFERRED;
}
// We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
// 使用堆外内存两个条件:1 有cleaner方法去释放堆外内存;2 io.netty.noPreferDirect 不能为true
// 也就是可以通过设置io.netty.noPreferDirect来切换
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
                          && !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);

PooledByteBufAllocator构造器可以传入preferDirect,但是不推荐

@SuppressWarnings("deprecation")
public PooledByteBufAllocator(boolean preferDirect) {
    this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}

所以也可以通过直接new的方式来切换是否使用堆外内存

bootstrap.childOption(ChannelOption.ALLOCATOR,
               new PooledByteBufAllocator(false));

堆外内存分配的本质

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                              int smallCacheSize, int normalCacheSize,
                              boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
    super(preferDirect);
    threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
    this.smallCacheSize = smallCacheSize;
    this.normalCacheSize = normalCacheSize;
  ...
}

io.netty.buffer.AbstractByteBufAllocator#AbstractByteBufAllocator(boolean)

/**
 * Create new instance
 *
 * @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
 *                     a heap buffer
 */
protected AbstractByteBufAllocator(boolean preferDirect) {
    directByDefault = preferDirect && PlatformDependent.hasUnsafe();
    emptyBuf = new EmptyByteBuf(this);
}

io.netty.buffer.AbstractByteBufAllocator#buffer()

@Override
public ByteBuf buffer() {
    if (directByDefault) {
        return directBuffer();
    }
    return heapBuffer();
}

io.netty.buffer.UnpooledByteBufAllocator#newDirectBuffer

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

io.netty.buffer.UnpooledDirectByteBuf#UnpooledDirectByteBuf(io.netty.buffer.ByteBufAllocator, int, int)

public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    ObjectUtil.checkNotNull(alloc, "alloc");
    checkPositiveOrZero(initialCapacity, "initialCapacity");
    checkPositiveOrZero(maxCapacity, "maxCapacity");
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
  // allocateDirect分配堆外内存
    setByteBuffer(allocateDirect(initialCapacity), false);
}
/**
 * Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
 */
protected ByteBuffer allocateDirect(int initialCapacity) {
  // 调用JDK的allocateDirect来分配堆外内存
    return ByteBuffer.allocateDirect(initialCapacity);
}