为什么不直接使用JDK NIO
Netty:
- 支持常用应用层协议
- 解决传输问题:粘包,半包现象
- 支持流量整形
- 完善的断连、Idle等异常处理等
Netty规避了JDK NIO的bug
-
异常唤醒空转导致CPU 100%
判断空转的次数,如果大于一定的阈值,就rebuild多路复用器
-
IP_TOS参数(IP包的优先级和QoS选项)使用时抛出异常
Java.lang.AssertionError:Option not found
Netty的API更友好更强大
- JDK的NIO一些API不够友好,功能薄弱,如果ByteBuffer->Netty’s ByteBuf
- 除了NIO外,也提供了其他一些增强:ThreadLocal->Netty’s FastTheadLocal
Netty隔离变换、屏蔽细节
- 隔离JDK NIO的实现变化:nio->nio2(aio)->…
- 屏蔽JDK NIO的实现细节
Netty切换三种I/O模式
经典的三种I/O模式
生活场景:
去饭店吃饭
- 食堂排队打饭模式:排队在窗口,打好才走
- 点单、等待被叫模式:等待被叫,好了自己去端
- 包厢模式:点单后菜直接被端上桌
类比
- 饭店->服务器
- 饭菜->数据
- 饭菜好了->数据就绪
- 端菜/送菜->数据读取
吃饭模式 | I/O模式 | 发布版本 |
---|---|---|
排队打饭模式 | BIO(阻塞I/O) | JDK1.4之前 |
点单、等待被叫模式 | NIO(非阻塞I/O) | JDK1.4(2002年,java.nio包) |
包厢模式 | AIO(异步I/O) | JDK1.7(2011年) |
- 阻塞与非阻塞
- 菜没好,要不要死等->数据就绪前要不要等待
- 阻塞:没有数据传过来时,读会阻塞知道有数据;缓冲区满时,写操作也会阻塞。
- 非阻塞遇到这些情况,都是直接返回
- 异步与同步
- 菜好了,谁端->数据就绪后,数据操作谁完成
- 数据就绪后需要自己去读是同步,数据就绪直接读好再回调给程序是异步
Netty对三种I/O的支持
BIO与AIO曾经支持过
为什么Netty仅支持NIO
-
为什么不建议(Deprecated)阻塞I/O(BIO/OIO)
连接数高的情况下:阻塞->耗资源、效率低
-
为什么删除已经做好的AIO支持
- Windows实现成熟,但是很少用来做服务器
- Linux常用来做服务器,但是AIO实现不够成熟
- Linux下AIO相比较NIO的性能提升不明显
为什么Netty有多种NIO实现
通用的NIO实现(Common)在Linux下也是使用epoll,为什么自己单独实现
- Netty暴露了更多的可控参数,例如
- JDK的NIO默认实现是水平触发
- Netty是边缘触发(默认)和水平触发可切换
- Netty实现的垃圾回收更少,性能更好
NIO一定由于BIO吗
- BIO代码更简单
- 特定场景:连接数少,并发度低,BIO性能不输于NIO
Netty对IO模式的支持
代码路径:
Netty源码–>netty-example–>EchoServer
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
将代码中的Nio改成Oio就完成了I/O的切换
EventLoopGroup bossGroup = new OioEventLoopGroup(1);
EventLoopGroup workerGroup = new OioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
Netty支持三种Reactor
什么是reactor及三种版本
生活场景:饭店规模变化
- 一个人包揽所有
- 多招几个伙计
- 进一步分工
生活场景类比:
- 饭店伙计:线程
- 迎宾工作:接入连接
- 点菜:请求
- 一个人包揽所有–>Reactor单线程
- 多招几个伙计–>Reactor多线程
- 进一步分工–>主从Reactor多线程
- 做菜:业务处理
- 上菜:响应
- 送客:断连
BIO | NIO | AIO |
---|---|---|
Thread-Per-Connection | Reactor | Proactor |
Reactor是一种开发模式,模式的核心流程:
注册感兴趣的事件–>扫描是否有感兴趣的事件发生–>事件发生后做出相应的处理
Client/Server | SocketChannel/ServerSockerChannel | OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ |
---|---|---|---|---|---|
client | SockerChannel | Y | Y | Y | |
server | ServerSocketChannel | Y | |||
server | ServerChannel | Y | Y |
BIO
一个线程处理读、 解码、运算、编码、响应
read和send都是阻塞操作,阻塞的连接越多,占用的线程越多
Reactor单线程模式
所有事情都是一个线程在做
Reactor多线程模式
decode、compute、decode都交给线程池
主从Reactor多线程模式
将acceptor事件单独注册到另外一个Reactor中
在netty中使用Reactor模式
Reactor单线程模式 | EventLoopGroup eventGroup = new NioEventLoopGroup(1); ServerBootStrap serverBootStrap = new ServerBootStrap(); serverBootStrap.group(eventGroup); |
---|---|
非主从Reactor多线程 | EventLoopGroup eventGroup = new NioEventLoopGroup(); ServerBootStrap serverBootStrap = new ServerBootStrap(); serverBootStrap.group(eventGroup); |
主从Reactor多线程 | EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootStrap serverBootStrap = new ServerBootStrap(); serverBootStrap.group(bossGroup, workerGroup); |
不设置的话,会根据cpu核数计算最优的线程数
TCP粘包、半包
什么是粘包和半包
比如客户端发送两条消息ABC和DEF,对方接收到的消息不见得就是这种格式接收到的,可能是一次接收到所有的数据ABCDEF,也可能分多次接收完,比如AB,CD,EF或A,BCDE,F
一次全接收完就是粘包现象,多次接收完是半包现象
粘包的主要原因:
- 发送方每次写入数据<套接字缓冲区大小
- 接收方读取套接字缓存数据不够及时
半包的主要原因:
- 发送方写入数据>套接字缓冲区大小
- 发送的数据大于协议的MTU(Maximun Transmission Unit,最大传输单元),必须拆包
tcp/ip 协议是分层的,每层包的大小都有控制,这个控制就叫MTU
Media for IP transport | Maximun transmission unit(bytes) |
---|---|
Internet IPv4 path MTU | At least 68,max of 64 KiB |
Internet IPv6 path MTU | At least 1280,max of 64 KiB,but up to 4 GiB with optional jumbogram |
Ethernet v2 | 1500 |
只要数据比较大的时候就会进行拆包
换个角度来看
-
收发
一个发送可能被多次接收,多个发送可能被一次接收
-
传输
一个发送可能占用多个传输包,多个发送可能公用一个传输包
根本原因:TCP是流式协议,消息无边界
提醒:UDP像邮寄的包裹,虽然一次运输多个,但是每个包裹都有”界限“,一个一个签收,所以无粘包、半包现象
解决方式
解决问题的根本手段:找出消息的边界
方式\比较 | 寻找消息边界方式 | 优点 | 缺点 | 推荐度 | |
TCP连接改成短连接,一个请求一个短连接 | 建立连接到释放连接之间的信息即为传输信息 | 简单 | 效率低下 | 不推荐 | |
封装成帧(Framing) | 固定长度(|ABC|DEF|GHI) | 满足固定长度即可 | 简单 | 空间浪费 | 不推荐 |
分隔符(|ABC\nDEF\n|) | 分隔符之间 | 空间不浪费,也比较简单 | 内容本身出现分隔符时需要转义,所以需要扫描内容 | 推荐 | |
固定长度字段存个内容的长度信息(|0x0005|"ABC"|) | 先解析固定长度的字段获取长度,然后读取后续内容 | 精确定位用户数据,内容也不用转义 | 长度理论上有限制,需提前预知可能的最大长度,从而定义长度占用字节数 | 推荐 | |
其他方式 | 每种都不同,例如JSON可以看{}是否应已经成对 | 衡量实际场景,很多是对现有协议的支持 |
Netty对三种常用封帧方式的支持
方式\支持 | 解码 | 编码 | |
封装成帧(Framing) | 固定长度 | FixedLengthFrameDecode | 简单 |
分隔符 | DelimiterBasedFrameDecoder | ||
固定长度字段存内容的长度信息 | LengthDieldBasedFrameDecoder | LengthFieldPrepender |
ByteToMessageDecoder
解码入口为channelRead方法
@Override
// msg是数据
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
selfFiredChannelRead = true;
CodecOutputList out = CodecOutputList.newInstance();
try {
// cumulation 数据积累器,解码之前或者解码之后,他要做数据积累
first = cumulation == null;
// 追加到数据积累起
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 调用decode
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes, so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
ctx.fireChannelRead(msg);
}
}
// in是数据积累器的数据
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
final int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
}
int oldInputLength = in.readableBytes();
// decode中时,不能执行handler remove清理操作
// decode完之后,要清理数据
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (out.isEmpty()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
FixedLengthFrameDecoder
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 收集到的数据是否小于定义的长度
if (in.readableBytes() < frameLength) {
return null;
} else {
// 根据定义的长度解析数据,多出来的数据仍然在数据积累器中,所以不用担心粘包的问题
return in.readRetainedSlice(frameLength);
}
数据积累器
/**
* Cumulate {@link ByteBuf}s.
*/
public interface Cumulator {
/**
* Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes.
* The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so
* call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed.
*/
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
/**
* Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies.
*/
// 采用内存复制的方式
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable() && in.isContiguous()) {
// If cumulation is empty and input buffer is contiguous, use it directly
cumulation.release();
return in;
}
try {
final int required = in.readableBytes();
// 如果不够空间的话,进行扩容
if (required > cumulation.maxWritableBytes() ||
required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1 ||
cumulation.isReadOnly()) {
// Expand cumulation (by replacing it) under the following conditions:
// - cumulation cannot be resized to accommodate the additional data
// - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
// assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
return expandCumulation(alloc, cumulation, in);
}
// 追加数据
cumulation.writeBytes(in, in.readerIndex(), required);
in.readerIndex(in.writerIndex());
return cumulation;
} finally {
// We must release in all cases as otherwise it may produce a leak if writeBytes(...) throw
// for whatever release (for example because of OutOfMemoryError)
in.release();
}
}
};
/**
* Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible.
* Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case
* and the decoder implementation this may be slower than just use the {@link #MERGE_CUMULATOR}.
*/
// 组合模式
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
if (!cumulation.isReadable()) {
cumulation.release();
return in;
}
CompositeByteBuf composite = null;
try {
if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
composite = (CompositeByteBuf) cumulation;
// Writer index must equal capacity if we are going to "write"
// new components to the end
if (composite.writerIndex() != composite.capacity()) {
composite.capacity(composite.writerIndex());
}
} else {
composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
}
// 避免内存复制
composite.addFlattenedComponents(true, in);
in = null;
return composite;
} finally {
if (in != null) {
// We must release if the ownership was not transferred as otherwise it may produce a leak
in.release();
// Also release any new buffer allocated if we're not returning it
if (composite != null && composite != cumulation) {
composite.release();
}
}
}
}
};
核心参数
DelimiterBasedFrameDecoder
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf[] delimiters; // 可以支持多个分隔符
private final int maxFrameLength;
private final boolean stripDelimiter;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
private final LineBasedFrameDecoder lineBasedDecoder;
...
}
FixedLengthFrameDecoder
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
// 固定长度
private final int frameLength;
/**
* Creates a new instance.
*
* @param frameLength the length of the frame
*/
public FixedLengthFrameDecoder(int frameLength) {
checkPositive(frameLength, "frameLength");
this.frameLength = frameLength;
}
...
}
LengthFieldBasedFrameDecoder
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
...
}
文档中可以看到4个核心的控制参数
-
lengthFieldOffset:length字段的偏移量
-
lengthFieldLength:length所占用的长度
-
lengthAdjustment:长度修正,长度字段与消息体字段直接的距离
-
initialBytesToStrip:跳过一个包中前面多少个字节不处理,通常是将协议头部跳过,只将消息体中内容传输到下游时使用
lengthFieldOffset = 0 lengthFieldLength = 2 // length字段站2字节长度 lengthAdjustment = 0 initialBytesToStrip = 0 * +--------+----------------+ +--------+----------------+ * | Length | Actual Content |----->| Length | Actual Content | * | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | * +--------+----------------+ +--------+----------------+ lengthFieldOffset = 2 // length字段偏移2字节 lengthFieldLength = 3 // length字段长度为3字节 lengthAdjustment = 0 initialBytesToStrip = 0 * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content | * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+ lengthFieldOffset = 0 // length字段偏移量为0 lengthFieldLength = 3 // length字段长度为3字节 lengthAdjustment = 2 // length字段距离数据2字节 initialBytesToStrip = 0 Header 1的长度为2 * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content | * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+ lengthFieldOffset = 1 // length字段偏移量为1 lengthFieldLength = 2 // length字段长度2字节 lengthAdjustment = 1 // length字段距离数据1字节 initialBytesToStrip = 3 // 剥离3字节的前置头 * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+ length字段表示整个包的长度 lengthFieldOffset = 1 // length字段偏移量为1 lengthFieldLength = 2 // length字段长度2字节 lengthAdjustment = -3 // 当length为整个包的长度时,将lengthAdjustment设置为负数,这里3为HD1+Length的长度 initialBytesToStrip = 3 // 剥离3字节的前置头 * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+
二次编解码
为什么要二次编解码
假设把解决半包粘包文件的常用三种解码器叫做一次解码器
方式\支持 | 解码 | 编码 | |
封装成帧(Framing) | 固定长度 | FixedLengthFrameDecode | 不内置,几乎没人人 |
分隔符 | DelimiterBasedFrameDecoder | 不内置,太简单 | |
固定长度字段存内容的长度信息 | LengthDieldBasedFrameDecoder | LengthFieldPrepender |
那么我们在项目中,除了可选的压缩解压缩之外,还需要一层解码,因为一次解码的结果是字节,如果要和项目中所使用的对象做转换,方便使用,这层解码器可以称为二次解码器,相应的,对应的编码器是为了将Java对象转化成字节流方便存储或传输
- 一次解码器:ByteToMessageDecoder
- io.netty.buffer.ByteBuf(原始数据流)–>io.netty.buffer.ByteBuf(用户数据)
- 二次解码器:MessageToMessageDecode<I>
- io.netty.buffer.ByteBuf(用户数据)–> Java Object
是否可以一步到位,合并1次解密(解决粘包半包)和2次解码(解决可操作问题)
可以,但是不建议
- 没有分层,不够清晰
- 耦合高,不容易置换方案
常用的二次编解码方式
- Java序列化
- Marshaling
- XML
- JSON
- MEssagePack
- Protobuf
- 其他
选择编解码方式的要点
- 空间:编码后占用空间
- 时间:编解码速度
- 是否追求可读性
- 多语言的支持
Google Protobuf简介
- Protobuf是一个灵活的、高效的用于序列化数据的协议
- 相比较XML和JSON格式,Protobuf更小,更快,更便捷
- Protobuf是跨语言的,并且自带了一个编译器protoc,只需要用它进行编译,可以自动生成Java、python、C++等代码,不需要再写其他代码
Netty多常用编解码方式
netty-codec包
keepalive与Idle
Keepalive
为什么需要keepalive
生活场景
假设你开了一个饭店,别人打电话来订餐,电话通了之后,订餐的说了一堆订餐的需求,说着说着,对象就不讲话了(可能忘记挂机/出去办事/线路故障等)
这是时候你会一直握着电话等待吗?不会,一会你会确认一句,”你还在吗“,如果对方没有回复,就挂机。这个机制就是keepalive
类比服务器应用
订餐电话场景 | 服务器应用 |
---|---|
电话线路 | 数据连接(TCP连接) |
交谈的话语 | 数据 |
通话双方 | 数据发送和接收方 |
对比\场景 | 场景 | 应用 |
需要keepalive的场景 | 对方临时走开 | 对端异常”崩溃“ |
对方在,但是很忙,不知道什么时候能忙完 | 对端在,但是处理不过来 | |
电话线路故障 | 对端在,但是不可达 | |
不做keepalive的后果 | 线路占用,耽误其他人订餐 | 连接已坏,但是还浪费资源,下次直接使用会直接报错 |
如何设计keepalive
以TCP keepalive为例
# TCP keepalive核心参数
sysctl -a|grep tcp_keepavlie
# 7200秒没有数据传输才做检测
net.ipv4.tcp_keepalive_time = 7200
# 下一个探测包的间隔时间
net.ipv4.tcp_keepalive_intvl = 75
# 探测包的发送次数
net.ipv4.tcp_keepalive_probes = 9
当启用(默认关闭)keepalive时,TCP在连接没有数据通过的7200秒之后发送keepalive消息,当探测没有回复时按75秒重试频率发送,一直发9个探测包都没有勘测到回复时连接失败。
所以总耗时为2小时11分钟(7200秒 + 75秒*9)
为什么需要应用层的keepalive
-
协议分层,各层关注点不同
传输层关注是否”通”,应用层关注是否可服务。类比之前的订餐例子,电话能通,不代表有人接;服务器链接子,但是不一定可以服务(例如服务不过来)
-
TCP层的keepalive默认关闭,且警告路由等中转设备keepalive包可能被丢弃
-
TCP层的keepalive时间太长
默认大于2小时,虽然可以修改,但是属于系统参数,改动会影响所有应用
注意:HTTP属于应用层协议,但是常常听到名词“HTTP Keep-Alive”指的是对长连接和短连接的选择
- Connection:Keep-Alive 长连接(HTTP/1.1默认长连接,不需要添加这个header)
- Connection:Close 短连接
Idle监测
什么是Idle监测
重现场景
假设你开了一个饭店,别人打电话来订餐,电话通了之后,订餐的说了一堆订餐的需求,说着说着,对象就不讲话了。
你会立马发问:你还在吗
不会,一般你会稍等待一定的时间,在这个时间内看看对方还不不会说话(Idle检测),如果还不说话,认定潍坊存在问题(Idle),于是开始发问“你还在吗”(keepavlie),或者问都不问干脆直接挂机(关闭连接)
Idle监测,只是负责诊断,诊断后,做出不同的行为,决定Idle监测的最终用途:
-
发送keepalive:一般用来配合keepalive,减少keepalive消息
keepalive设计演进:V1:定时keepalive消息->V2:空闲检测+判断为Idle时才发keepalive
- V1:keepalive消息与服务器正常消息交换完全不关联,定时就发送
- V2:有其他数据传输时,不发送keepalive,无数据传输超过一定时间,判定为Idle,再发送keepalive
-
直接关闭连接
- 跨苏释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态
- 简单粗暴,客户端可能需要重连
实际应用中,结合起来使用。按需keepalive,保证不会空闲,如果空闲,关闭连接。
在Netty中开启TCP keepalive和Idle检测
开启keepalive:
Server端开启TCP keepalive
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(NioChannelOption.SO_KEEPALIVE,true)
提示:.option(ChannelOption.SO_KEEPALIVE,true)存在但是无效
开启不同的Idle Check
ch.pipeline().addList("idleCheckHandler",new IdelStateHandler(0,20,0,TimeUnit.SECONDS));
io.netty.handler.timeout.IdleStateHandler#IdleStateHandler(long, long, long, java.util.concurrent.TimeUnit)
/**
* @see #IdleStateHandler(boolean, long, long, long, TimeUnit)
*
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}
参数:
observeOutput - 在评估写入空闲时是否应考虑bytes的消耗。默认值为false 。readerIdleTime – 状态为IdleStateEvent的IdleState.READER_IDLE将在指定时间段内未执行读取时触发。指定0以禁用。
writerIdleTime – 状态为IdleStateEvent的IdleState.WRITER_IDLE将在指定时间段内未执行写入时触发。指定0以禁用。
allIdleTime – 状态为IdleStateEvent的IdleState.ALL_IDLE将在指定时间段内未执行读取或写入时触发。指定0以禁用。
unit – readerIdleTime 、 writeIdleTime和allIdleTime的TimeUnit
两种开启keepalive方式的区别
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childOption(NioChannelOption.SO_KEEPALIVE,true)
这里的child是跟客户端做连接的SocketChannel
io.netty.bootstrap.ServerBootstrap#childOption
/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
* {@link ChannelOption}.
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
ObjectUtil.checkNotNull(childOption, "childOption");
synchronized (childOptions) {
if (value == null) {
childOptions.remove(childOption);
} else {
childOptions.put(childOption, value);
}
}
return this;
}
io.netty.bootstrap.ServerBootstrap#init
@Override
void init(Channel channel) {
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
// childOptions转换成了currentChildOptions
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// ServerBootstrapAcceptor是接受连接后的后续处理
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
// 将keepalive设置到channel
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
io.netty.bootstrap.AbstractBootstrap#setChannelOption
private static void setChannelOption(
Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
try {
if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
}
} catch (Throwable t) {
logger.warn(
"Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
}
}
io.netty.channel.ChannelConfig#setOption
<T> boolean setOption(ChannelOption<T> option, T value);
他有很多实现,上面提到child是一个SocketChannel,所以选择NioSocketChannel的实现
io.netty.channel.socket.nio.NioSocketChannel.NioSocketChannelConfig#setOption
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
// jdk版本大于=7,并且是NioChannelOption的时候
if (PlatformDependent.javaVersion() >= 7 && option instanceof NioChannelOption) {
return NioChannelOption.setOption(jdkChannel(), (NioChannelOption<T>) option, value);
}
// 普通的channelOption
return super.setOption(option, value);
}
io.netty.channel.socket.nio.NioChannelOption#setOption
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
static <T> boolean setOption(Channel jdkChannel, NioChannelOption<T> option, T value) {
java.nio.channels.NetworkChannel channel = (java.nio.channels.NetworkChannel) jdkChannel;
if (!channel.supportedOptions().contains(option.option)) {
return false;
}
// 解决NIO的bug
if (channel instanceof ServerSocketChannel && option.option == java.net.StandardSocketOptions.IP_TOS) {
// Skip IP_TOS as a workaround for a JDK bug:
// See https://mail.openjdk.java.net/pipermail/nio-dev/2018-August/005365.html
return false;
}
try {
// JDK调用
channel.setOption(option.option, value);
return true;
} catch (IOException e) {
throw new ChannelException(e);
}
}
io.netty.channel.socket.DefaultSocketChannelConfig#setOption
// 普通的channelOption
@Override
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == SO_RCVBUF) {
setReceiveBufferSize((Integer) value);
} else if (option == SO_SNDBUF) {
setSendBufferSize((Integer) value);
} else if (option == TCP_NODELAY) {
setTcpNoDelay((Boolean) value);
} else if (option == SO_KEEPALIVE) {
setKeepAlive((Boolean) value);
} else if (option == SO_REUSEADDR) {
setReuseAddress((Boolean) value);
} else if (option == SO_LINGER) {
setSoLinger((Integer) value);
} else if (option == IP_TOS) {
setTrafficClass((Integer) value);
} else if (option == ALLOW_HALF_CLOSURE) {
setAllowHalfClosure((Boolean) value);
} else {
return super.setOption(option, value);
}
return true;
}
Idle
源码在netty-handler下面,属于netty的扩展,在handler中的timeout包中
**
* An {@link Enum} that represents the idle state of a {@link Channel}.
*/
public enum IdleState {
/**
* No data was received for a while.
*/
// 读Idle
READER_IDLE,
/**
* No data was sent for a while.
*/
// 写Idle
WRITER_IDLE,
/**
* No data was either received or sent for a while.
*/
// 没有数据传和数据收
ALL_IDLE
}
public class IdleStateEvent {
public static final IdleStateEvent FIRST_READER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.READER_IDLE, true);
public static final IdleStateEvent READER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.READER_IDLE, false);
public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.WRITER_IDLE, true);
public static final IdleStateEvent WRITER_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.WRITER_IDLE, false);
public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.ALL_IDLE, true);
public static final IdleStateEvent ALL_IDLE_STATE_EVENT =
new DefaultIdleStateEvent(IdleState.ALL_IDLE, false);
...
}
这里将IdleState的三种类型细分成了6个,增加了是否第一次的属性,因为idle可能会多次触发,如果是首次的话,可能会进行一些特殊的处理
io.netty.handler.timeout.IdleStateHandler.ReaderIdleTimeoutTask
// 读Idle超时任务
private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {
// 计算是否Idle
nextDelay -= ticksInNanos() - lastReadTime;
}
if (nextDelay <= 0) {
// 空闲了
// Reader is idle - set a new timeout and notify the callback.
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstReaderIdleEvent;
// firstReaderIdleEvent下个读来之前,第一次idle之后,可能触发多次
firstReaderIdleEvent = false;
try {
// 创建一个IdleStateEvent
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// 没有发生空闲,重新起一个监测task,用nextdelay时间
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
io.netty.handler.timeout.IdleStateHandler.WriterIdleTimeoutTask
private final class WriterIdleTimeoutTask extends AbstractIdleTask {
WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
}
@Override
protected void run(ChannelHandlerContext ctx) {
long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback.
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false;
try {
if (hasOutputChanged(ctx, first)) {
return;
}
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
写Idle超时任务和读Idle超时任务差不多,它多了一个hasOutputChanged
判断
/**
* Returns {@code true} if and only if the {@link IdleStateHandler} was constructed
* with {@link #observeOutput} enabled and there has been an observed change in the
* {@link ChannelOutboundBuffer} between two consecutive calls of this method.
*
* https://github.com/netty/netty/issues/6150
*/
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) {
// 正常情况下,false,即写空闲的判断中的写是指写成功,但是实际上,可能遇到集中情况
// (1)写了,但是缓存区满了,写不出去;(2)写了一个大数据,写确实在动,但是没有完成。
// 所以这个参数,判断是否有写的意图,而不是判断是否写成功
// We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime;
// But this applies only if it's the non-first call.
if (!first) {
return true;
}
}
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes();
// pendingWriteBytes != lastPendingWriteBytes 在发送的字节数不等于上一次发送的字节数
if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes;
if (!first) {
return true;
}
}
long flushProgress = buf.currentProgress();
// 写的进度,不一定完全写完了,但一直在动
if (flushProgress != lastFlushProgress) {
lastFlushProgress = flushProgress;
return !first;
}
}
}
return false;
}
Netty实现了几个发生Idle时候的处理,如果发生了ReadIdle,我们要抛出异常的话,可以用ReadTimeoutHandler
@Override
protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
assert evt.state() == IdleState.READER_IDLE;
readTimedOut(ctx);
}
/**
* Is called when a read timeout was detected.
*/
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
WriteTimeoutHandler
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (timeoutNanos > 0) {
promise = promise.unvoid();
// 写的时候schedule一个task去检查是否完成了
scheduleTimeout(ctx, promise);
}
ctx.write(msg, promise);
}
private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
// Schedule a timeout.
final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);
if (!task.scheduledFuture.isDone()) {
addWriteTimeoutTask(task);
// Cancel the scheduled timeout if the flush promise is complete.
promise.addListener(task);
}
}
io.netty.handler.timeout.WriteTimeoutHandler.WriteTimeoutTask#run
@Override
public void run() {
// Was not written yet so issue a write timeout
// The promise itself will be failed with a ClosedChannelException once the close() was issued
// See https://github.com/netty/netty/issues/2159
// 判断write本身是否完成
if (!promise.isDone()) {
try {
// 触发写超时异常
writeTimedOut(ctx);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
removeWriteTimeoutTask(this);
}
ReadTimeoutHandler是判断read是否空闲的,而WriteTimeoutHandler是判断写是否完成的
Netty的锁
同步问题的三要素
- 原子性
- 可见性
- 有序性
锁的分类
- 对竞争的态度:乐观锁(JUC包中的原子类)与悲观锁(Synchronized)
- 等待锁的人是否公平而言:公平锁 new ReentrantLock(true) 与非公平锁new ReentrantLock()
- 是否可以共享:共享锁与独占锁ReadWriteLock,读锁是共享锁,写锁是独占锁
Netty锁的几个关键点
-
在意锁的对象和范围->减少粒度
Synchronized method -> Synchronized block
-
注意锁的对象本身大小->减少空间占用
例:统计待发送的字节数(io.netty.channel.ChannelOutboundBuffer)
AtomicLong -> Volatile long + AtomicLongFieldUpdater
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); @SuppressWarnings("UnusedDeclaration") private volatile long totalPendingSize;
private void incrementPendingOutboundBytes(long size, boolean invokeLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } }
Atomic long VS long
前者是一个对象,包含对象头(object header)以用来保存hashcode、lock等信息,32为系统占用8字节,64位系统占16字节,所以在64位系统情况下:
- volatile long = 8 bytes
- AtomicLong = 8 bytes (volatile long) + 16 bytes (对象头) + 8 bytes (引用) =32 bytes
至少节约24字节
Atomic* Object ->Volatile primary type + Static Atomic*FieldUpdater
-
注意锁的速度->提高并发性
例:记录内存分配字节数等功能用到的LongCounter
io.netty.util.internal.PlatformDependent#newLongCounter
高并发时:LongAdder的性能优与原子类的性能
/** * Creates a new fastest {@link LongCounter} implementation for the current platform. */ public static LongCounter newLongCounter() { if (javaVersion() >= 8) { return new LongAdderCounter(); } else { return new AtomicLongCounter(); } }
@SuppressJava6Requirement(reason = "Usage guarded by java version check") final class LongAdderCounter extends LongAdder implements LongCounter { @Override public long value() { return longValue(); } }
-
不同场景选择不同的并发包->因需而变
例:关闭和等待事件执行器(Event Executor)
Object.wait/notify->CountDownLatch
io.netty.util.concurrent.SingleThreadEventExecutor#threadLock
例:Nio Event Loop 中负责存储task的Queue
Jdk’s LinkedBlockingQueue(MPMC)->jctools MPSC
MPMC:多生产者多消费者
MPSC:多生成者单消费者
NioEventLoop只绑定了一个线程,也就是消费者只有一个
io.netty.util.internal.PlatformDependent.Mpsc#newChunkedMpscQueue
static <T> Queue<T> newChunkedMpscQueue(final int chunkSize, final int capacity) { return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscChunkedArrayQueue<T>(chunkSize, capacity) : new MpscChunkedAtomicArrayQueue<T>(chunkSize, capacity); }
-
衡量好锁的价值->能不用则不用
生活场景:
饭店提供了很多包厢,服务模式:
- 一个服务员固定服务某几个包厢模式
- 所有的服务员服务所有包厢的模式
表面上看,前者效率没有后者高,单实际上它避免了服务员之间的沟通(上下文切换)等开销,避免客人和服务员之间导出乱窜,管理简单。
局部串行:Channel的I/O请求处理Pipeline是串行的
整体并行:多个串行化的线程(NioEventLoop)
Netty应用场景下:局部串行+整体并行 >一个队列+多个线程模式:
- 降低用户开发难度、逻辑简单、提升处理性能
- 避免锁带来的上下文切换和并发保护等额外开销
避免用锁:用ThreadLocal来避免资源争用,例如Netty轻量级的线程池实现
io.netty.util.Recycler#threadLocal
private final FastThreadLocal<LocalPool<T>> threadLocal = new FastThreadLocal<LocalPool<T>>() { @Override protected LocalPool<T> initialValue() { return new LocalPool<T>(maxCapacityPerThread, interval, chunkSize); } @Override protected void onRemoval(LocalPool<T> value) throws Exception { super.onRemoval(value); MessagePassingQueue<DefaultHandle<T>> handles = value.pooledHandles; value.pooledHandles = null; handles.clear(); } };
Netty的内存使用
内存使用技巧的目标
- 内存占用少(空间)
- 应用速度快(时间)
对Java而言,减少Full GC的STW(stop the world)时间
减少对象本身的大小
能用基本类型就不要用包装类型
应该定义成类变量的不要定义为实例变量
结合前两者 AtomicLong->volatile long + static AtomicLongFieldUpdater
对分配内存进行预估
对于已经可以预知固定size的HashMap避免扩容
可以提前计算好初始size或者直接使用
com.google.common.collect.Maps#newHashMapWithExpectedSize
/**
* Creates a {@code HashMap} instance, with a high enough "initial capacity" that it <i>should</i>
* hold {@code expectedSize} elements without growth. This behavior cannot be broadly guaranteed,
* but it is observed to be true for OpenJDK 1.7. It also can't be guaranteed that the method
* isn't inadvertently <i>oversizing</i> the returned map.
*
* @param expectedSize the number of entries you expect to add to the returned map
* @return a new, empty {@code HashMap} with enough capacity to hold {@code expectedSize} entries
* without resizing
* @throws IllegalArgumentException if {@code expectedSize} is negative
*/
public static <K, V> HashMap<K, V> newHashMapWithExpectedSize(int expectedSize) {
return new HashMap<>(capacity(expectedSize));
}
/**
* Returns a capacity that is sufficient to keep the map from being resized as long as it grows no
* larger than expectedSize and the load factor is ≥ its default (0.75).
*/
static int capacity(int expectedSize) {
if (expectedSize < 3) {
checkNonnegative(expectedSize, "expectedSize");
return expectedSize + 1;
}
if (expectedSize < Ints.MAX_POWER_OF_TWO) {
// This is the calculation used in JDK8 to resize when a putAll
// happens; it seems to be the most conservative calculation we
// can make. 0.75 is the default load factor.
return (int) ((float) expectedSize / 0.75F + 1.0F);
}
return Integer.MAX_VALUE; // any large value
}
Netty根据接受到的数据动态调整(guess)下个要分配的Buffer的大小,可参考
io.netty.channel.AdaptiveRecvByteBufAllocator.HandleImpl#record
// 接受数据buffer的容量会尽可能的足够大以接受数据,也尽可能足够小避免浪费空间
private void record(int actualReadBytes) {
if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT)]) {
if (decreaseNow) {
index = max(index - INDEX_DECREMENT, minIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
} else {
decreaseNow = true;
}
} else if (actualReadBytes >= nextReceiveBufferSize) {
index = min(index + INDEX_INCREMENT, maxIndex);
nextReceiveBufferSize = SIZE_TABLE[index];
decreaseNow = false;
}
}
零拷贝 Zero-Copy
1.使用逻辑组合,代替实际复制
例如CompositeByteBuf
io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR
2.使用包装,代替实际复制
byte[] bytes = data.getBytes();
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
3.调用JDK的Zero-Copy接口
netty中也通过DefaultFileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝
@Override
public long transferTo(WritableByteChannel target, long position) throws IOException {
long count = this.count - position;
if (count < 0 || position < 0) {
throw new IllegalArgumentException(
"position out of range: " + position +
" (expected: 0 - " + (this.count - 1) + ')');
}
if (count == 0) {
return 0L;
}
if (refCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
// Call open to make sure fc is initialized. This is a no-oop if we called it before.
open();
long written = file.transferTo(this.position + position, count, target);
if (written > 0) {
transferred += written;
} else if (written == 0) {
// If the amount of written data is 0 we need to check if the requested count is bigger then the
// actual file itself as it may have been truncated on disk.
//
// See https://github.com/netty/netty/issues/8868
validate(this, position);
}
return written;
}
堆外内存
堆外内存生活场景:
夏天小区周边的烧烤店,人满为患坐不下,通常店家会在门口摆很多桌子招待客人。
店内->JVM内部->堆(heap)+非堆(non heap)
店外->JVM外部->堆外(off heap)
优点:
- 更广阔的“空间”,缓解店铺内压力->破除堆空间限制,减轻GC压力
- 减少“冗余”细节(介绍烧烤过程为了气氛在室外进行:考好直接上桌VS考好还要进店内)->避免复制
缺点:
- 需要搬桌子->创建速度稍慢
- 受城管管、风险大->堆外内存受操作系统管理
内存池
生活场景:
点菜单的演进:
- 一张纸:一桌客人一张纸
- 点菜平板:循环使用
为什么引入对象池:
- 创建对象开销大
- 对象高频率创建且可复用
- 支持并发又能保护系统
- 维护、共享有限的资源
如何实现对象池
- 开源实现:Apache Commons Pool
- Netty轻量级对象池实现 io.netty.util.Recycler
Netty对堆外内存和内存池的支持
内存池/非内存池的默认选择及切换方式
// 切换到unpooled的方式之一
bootstrap.childOption(ChannelOption.ALLOCATOR, UnpooledByteBufAllocator.DEFAULT);
// 切换到pooled的方式
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
netty的默认选择
io.netty.channel.DefaultChannelConfig#allocator
// 默认bytebuf分配器
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
io.netty.buffer.ByteBufUtil
static {
// 以io.netty.allocator.type为准,没有的话,安卓平台使用非池化技术,其他使用池化技术
// 也就是可以使用io.netty.allocator.type来指定是否使用池化技术
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
ByteBufAllocator alloc;
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
DEFAULT_ALLOCATOR = alloc;
THREAD_LOCAL_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.threadLocalDirectBufferSize", 0);
logger.debug("-Dio.netty.threadLocalDirectBufferSize: {}", THREAD_LOCAL_BUFFER_SIZE);
MAX_CHAR_BUFFER_SIZE = SystemPropertyUtil.getInt("io.netty.maxThreadLocalCharBufferSize", 16 * 1024);
logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
}
获取池中对象
io.netty.buffer.PooledDirectByteBuf#newInstance
// 从“池”里借一个用
// RECYCLER-->轻量级线程池实现
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
io.netty.util.Recycler#get
public final T get() {
if (maxCapacityPerThread == 0) {
// 表明没有开启池化
return newObject((Handle<T>) NOOP_HANDLE);
}
LocalPool<T> localPool = threadLocal.get();
DefaultHandle<T> handle = localPool.claim();
T obj;
// 试图从“池”中取出一个,没有就新建一个
if (handle == null) {
handle = localPool.newHandle();
if (handle != null) {
obj = newObject(handle);
handle.set(obj);
} else {
obj = newObject((Handle<T>) NOOP_HANDLE);
}
} else {
obj = handle.get();
}
return obj;
}
归还对象
io.netty.buffer.PooledByteBuf#deallocate
// 归还对象到“池”里,pipeline的tail会调用
@Override
protected final void deallocate() {
if (handle >= 0) {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.decrementPinnedMemory(maxLength);
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
recycle();
}
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
// 释放用完的对象到池里面去
localPool.release(this);
}
堆内内存/堆外内存
io.netty.buffer.PooledByteBufAllocator#DEFAULT
public static final PooledByteBufAllocator DEFAULT =
new PooledByteBufAllocator(PlatformDependent.directBufferPreferred());
io.netty.util.internal.PlatformDependent#directBufferPreferred
public static boolean directBufferPreferred() {
return DIRECT_BUFFER_PREFERRED;
}
// We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
// 使用堆外内存两个条件:1 有cleaner方法去释放堆外内存;2 io.netty.noPreferDirect 不能为true
// 也就是可以通过设置io.netty.noPreferDirect来切换
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
&& !SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
PooledByteBufAllocator构造器可以传入preferDirect,但是不推荐
@SuppressWarnings("deprecation")
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
所以也可以通过直接new的方式来切换是否使用堆外内存
bootstrap.childOption(ChannelOption.ALLOCATOR,
new PooledByteBufAllocator(false));
堆外内存分配的本质
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
...
}
io.netty.buffer.AbstractByteBufAllocator#AbstractByteBufAllocator(boolean)
/**
* Create new instance
*
* @param preferDirect {@code true} if {@link #buffer(int)} should try to allocate a direct buffer rather than
* a heap buffer
*/
protected AbstractByteBufAllocator(boolean preferDirect) {
directByDefault = preferDirect && PlatformDependent.hasUnsafe();
emptyBuf = new EmptyByteBuf(this);
}
io.netty.buffer.AbstractByteBufAllocator#buffer()
@Override
public ByteBuf buffer() {
if (directByDefault) {
return directBuffer();
}
return heapBuffer();
}
io.netty.buffer.UnpooledByteBufAllocator#newDirectBuffer
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
io.netty.buffer.UnpooledDirectByteBuf#UnpooledDirectByteBuf(io.netty.buffer.ByteBufAllocator, int, int)
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
ObjectUtil.checkNotNull(alloc, "alloc");
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
// allocateDirect分配堆外内存
setByteBuffer(allocateDirect(initialCapacity), false);
}
/**
* Allocate a new direct {@link ByteBuffer} with the given initialCapacity.
*/
protected ByteBuffer allocateDirect(int initialCapacity) {
// 调用JDK的allocateDirect来分配堆外内存
return ByteBuffer.allocateDirect(initialCapacity);
}