跟踪诊断
让应用易诊断
完善线程名
nioEvemtLoopGroup-2-1:代表bossGroup
nioEventLoopGroup-3-1:带表workerGroup
如果netty改变了实现,这个命名可能也会发送变化
在声明NioEventLoopGroup时添加DefaultThreadFactory,给线程命名
NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
serverBootstrap.group(boss,worker);
完善handler名称
pipeline中有多个handler,handler经常会加上#0
的标记,甚至出现一个$1
,$1
是一个匿名内部类,#0
是为了防止一个piple中加入了多个handler。
在pipeline添加handler时声明名称即可
pipeline.addLast("frameDecode",new OrderFrameDecoder());
Netty日志的原理及使用
Netty日志框架原理
进入LoggingHandler类,查看构造器
public LoggingHandler(LogLevel level) {
this(level, ByteBufFormat.HEX_DUMP);
}
public LoggingHandler(LogLevel level, ByteBufFormat byteBufFormat) {
this.level = ObjectUtil.checkNotNull(level, "level");
this.byteBufFormat = ObjectUtil.checkNotNull(byteBufFormat, "byteBufFormat");
logger = InternalLoggerFactory.getInstance(getClass());
internalLevel = level.toInternalLevel();
}
InternalLoggerFactory.getInstance(getClass());生成了一个logger实例,查看getInstance()方法
/**
* Creates a new logger instance with the name of the specified class.
*/
public static InternalLogger getInstance(Class<?> clazz) {
return getInstance(clazz.getName());
}
/**
* Creates a new logger instance with the specified name.
*/
public static InternalLogger getInstance(String name) {
return getDefaultFactory().newInstance(name);
}
可以看到是从一个工厂创建了实例
public static InternalLoggerFactory getDefaultFactory() {
if (defaultFactory == null) {
defaultFactory = newDefaultFactory(InternalLoggerFactory.class.getName());
}
return defaultFactory;
}
private static InternalLoggerFactory newDefaultFactory(String name) {
InternalLoggerFactory f = useSlf4JLoggerFactory(name);
if (f != null) {
return f;
}
f = useLog4J2LoggerFactory(name);
if (f != null) {
return f;
}
f = useLog4JLoggerFactory(name);
if (f != null) {
return f;
}
return useJdkLoggerFactory(name);
}
可以看到,获取LoggerFactory的顺序是slf4j–>log4j2–>log4j–>jdk
修改JDK logger级别
在jre的lib目录下,有一个logging.properties文件,里面设置日志级别,jdk的日志级别没有DEBUG关键字,可以用FINE
使用slf4j+log4j
只需要在项目中引入相关的jar包即可
比如引入slf4j的依赖
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.36</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.9</version>
</dependency>
衡量logging handler的位置及级别
pipeline.addLast("frameDecode",new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
这里LoggingHandler拿到的数据已经是被解析成对象的数据了,如果想debug出原始的数据,可以将LoggingHandler移到最上面
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("frameDecode",new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
此时日志中就能打印出原始的数据了
13:13:00.690 [worker-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@26a0879c
13:13:00.693 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] READ: 49B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 2f 00 00 00 01 00 00 00 00 00 00 00 01 00 00 |./..............|
|00000010| 00 03 7b 22 74 61 62 6c 65 49 64 22 3a 31 30 30 |..{"tableId":100|
|00000020| 31 2c 22 64 69 73 68 22 3a 22 74 75 64 6f 75 22 |1,"dish":"tudou"|
|00000030| 7d |} |
+--------+-------------------------------------------------+----------------+
13:13:00.713 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] READ: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
13:13:00.723 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperationResult(tableId=1001, dish=tudou, complete=true))
13:13:00.727 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 3f |.? |
+--------+-------------------------------------------------+----------------+
13:13:00.728 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: 63B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 00 00 00 00 00 00 00 01 00 00 00 03 |................|
|00000010| 7b 22 74 61 62 6c 65 49 64 22 3a 31 30 30 31 2c |{"tableId":1001,|
|00000020| 22 64 69 73 68 22 3a 22 74 75 64 6f 75 22 2c 22 |"dish":"tudou","|
|00000030| 63 6f 6d 70 6c 65 74 65 22 3a 74 72 75 65 7d |complete":true} |
+--------+-------------------------------------------------+----------------+
13:13:00.728 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] FLUSH
13:13:00.728 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] FLUSH
应用可视化
Netty可视化案例演示:
- 实现一个小目标:统计并展示当前系统连接数
- Console日志定时输出
- JMX实时展示
ELKK、TIG、etc
代码示例
创建handler
创建一个用于监控的Handler:MetricHandler,因为是统计当前系统的连接数,所以需要对开启连接和关闭连接都进行处理,因此MetricHandler继承ChannelDuplexHandler,ChannelDuplexHandler既支持输入也支持输出,同时重写channelActive和channelInactive方法
// ChannelDuplexHandler 既支持输入也支持输出
public class MetricsHandler extends ChannelDuplexHandler {
// 用于统计连接的计数器
private AtomicLong totalConnectionNumber = new AtomicLong();
// 创建连接时的操作
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.incrementAndGet();
super.channelActive(ctx);
}
// 断开连接时的操作
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.decrementAndGet();
super.channelInactive(ctx);
}
}
此时连接数的统计就写好了,接着需要将统计的数据展示出来
引入metrics依赖
此处使用dropwizard的metrics
引入相关依赖:
<!-- 专门做度量用的 导出到core中有导出console的功能,导出到jmx要用jmx的包 -->
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.1.12.1</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-jmx</artifactId>
<version>4.1.12.1</version>
</dependency>
将我们统计的连接数进行注册和展示
// ChannelDuplexHandler 既支持输入也支持输出
// MetricHandler是可以共享的,所以可以加上一个注解@ChannelHandler.sharable
@ChannelHandler.Sharable
public class MetricsHandler extends ChannelDuplexHandler {
private AtomicLong totalConnectionNumber = new AtomicLong();
{
// MetricRegistry 入口
MetricRegistry metricRegistry = new MetricRegistry();
// 注册 totalConnectionNumber
metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
@Override
public Long getValue() {
return totalConnectionNumber.longValue();
}
});
// console的方式展示
ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
// 5秒一次
consoleReporter.start(5, TimeUnit.SECONDS);
// Jmx的方式展示
JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
// 实时的,只要启动起来就行
jmxReporter.start();
}
// 创建连接时的操作
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.incrementAndGet();
super.channelActive(ctx);
}
// 断开连接时的操作
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
totalConnectionNumber.decrementAndGet();
super.channelInactive(ctx);
}
}
添加pipeline
将handler加入到pipeline中
// 共享的handler,所以放在外部声明
MetricsHandler metricsHandler = new MetricsHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("frameDecode",new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast("metricsHandler", metricsHandler);
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());
}
});
启动Server,就能在控制台看到相关输出,此时如果启动client,就能看到数据的变化
-- Gauges ----------------------------------------------------------------------
totalConnectionNumber
value = 0
打开jconsole,连接到server之后,进入mBean就能看到相关的数据
Netty值得可视化的数据
外在数据
可视化信息 | 来源 | 备注 |
---|---|---|
连接信息统计 | channelActive/channelInactive | |
收数据统计 | channelRead | |
发数据统计 | write | 这个wirte只是写到buffer中,ctx.write(msg).addListener()更准确 |
异常统计 | exceptionCaught/ChannelFuture | ReadTimeoutException.INSTANCE |
内在数据
可视化信息 | 来源 | 备注 |
---|---|---|
线程数 | 根据不同实现计算 | 例如:nioEventLoopGroup.executorCount(); |
待处理任务 | executor.pendingTask() | 例如:NioEventLoop的待处理任务 |
积累的数据 | channelOutboundBuffer.totalPendingSize() | Channel(Connection)级别,不是整个系统的 |
可写状态切换 | channelWritabilityChanged | |
触发事件统计 | userEventTriggered | IdleStateEvent |
ByteBuf分配细节 | Pooled/UnpooledByteBufAllocator.DEFAULT.metric() |
让应用内存不泄露
Netty内存泄露指什么
原因:忘记release
ByteBuf buffer = ctx.alloc().buffer();
~~buffer.release()/ReferenceCountUtil.release(buffer)~~
后果:资源未释放 -->OOM
- 堆外:未free (PlatformDependent.freeDirectBuffer(buffer))
- 池化:未归还 (recyclerHandler.recycle(this))
Netty内存泄露检测核心思路
引用计数(buffer.refCnt())+弱引用(Week reference)
引用计数:
-
判断历史人物到底功大于过还是过大于功
功 +1,过 -1,=0时,尘归尘土归土,资源就该释放了
- 什么时候判断?“盖棺定论”时–>对象被GC后
强引用与弱引用
-
String 我是战斗英雄型强保镖 = new String(“我是主人”);这就是强引用
-
WeakReference<String> 我是爱写作的弱保镖 = new WeakRefrence<String>(new String(“我是主人”));
只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)
不过主人挂掉(被GC掉)的时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。
-
有了ReferenceQueue,netty就可以知道对象是否被GC了
Netty内存检测的核心思路:
ByteBuf buffer = ctx.alloc().buffer() —>引用计数+1—>定义弱引用对象DefaultResourceLeak加到list(#allLeaks)里
buffer.release()—>引用计数-1—>减到0时,自动执行释放资源操作,并将弱引用对象从list里移除
判断依据:若引入对象在不在list里面?如果在,说明引用计数还没有到0—>没有到0,说明没有执行释放
判断实际:弱引用指向对象被回收时,可以把弱引用放进指定ReferenceQueue里面去,所以遍历queue拿出所有弱引用来判断
Netty内存泄露检测源码分析
ResourceLeakDetector
/**
* Represents the level of resource leak detection.
*/
// level代表要不要开启内存泄露检测
public enum Level {
/**
* Disables resource leak detection.
*/
// 不开启
DISABLED,
/**
* Enables simplistic sampling resource leak detection which reports there is a leak or not,
* at the cost of small overhead (default).
*/
// 默认级别,只是告诉你有没有泄露,但是不会告诉你泄露的位置
SIMPLE,
/**
* Enables advanced sampling resource leak detection which reports where the leaked object was accessed
* recently at the cost of high overhead.
*/
// 会记录泄露可能的位置,所以内存占用会比较大
ADVANCED,
/**
* Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
* at the cost of the highest possible overhead (for testing purposes only).
*/
// 前面的级别只会跟踪一部分对象,PARANOID级别每个对象都会跟踪,也会记录可能泄露的地方
PARANOID;
/**
* Returns level based on string value. Accepts also string that represents ordinal number of enum.
*
* @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case.
* @return corresponding level or SIMPLE level in case of no match.
*/
static Level parseLevel(String levelStr) {
String trimmedLevelStr = levelStr.trim();
for (Level l : values()) {
if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) {
return l;
}
}
return DEFAULT_LEVEL;
}
}
/**
* Creates a new {@link ResourceLeakTracker} which is expected to be closed via
* {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated.
*
* @return the {@link ResourceLeakTracker} or {@code null}
*/
// track方法相当于是一个入口,在new一个ByteBuf的时候,就会调用track方法
@SuppressWarnings("unchecked")
public final ResourceLeakTracker<T> track(T obj) {
return track0(obj);
}
@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
// 判断级别,如果是DISABLED,就直接返回
if (level == Level.DISABLED) {
return null;
}
// 如果级别小于PARANOID,就不是每次都track了,而且按照一定的频率
if (level.ordinal() < Level.PARANOID.ordinal()) {
// samplingInterval默认是128,取随机数,看随机数是否等于0
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
reportLeak();
// 创建弱引用
// private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak
return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));
}
return null;
}
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));
}
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
Set<DefaultResourceLeak<?>> allLeaks,
Object initialHint) {
super(referent, refQueue);
assert referent != null;
// Store the hash of the tracked object to later assert it in the close(...) method.
// It's important that we not store a reference to the referent as this would disallow it from
// be collected via the WeakReference.
trackedHash = System.identityHashCode(referent);
// 创建弱引用对象的时候,会把这个对象加入到allLeaks
allLeaks.add(this);
// Create a new Record so we always have the creation stacktrace included.
headUpdater.set(this, initialHint == null ?
new TraceRecord(TraceRecord.BOTTOM) : new TraceRecord(TraceRecord.BOTTOM, initialHint));
this.allLeaks = allLeaks;
}
@Override
public boolean close() {
// close的时候会将当前对象移除出allLeaks
if (allLeaks.remove(this)) {
// Call clear so the reference is not even enqueued.
clear();
headUpdater.set(this, null);
return true;
}
return false;
}
private void reportLeak() {
// 判断logging的error级别是否打开,如果没打开,检测到了内存泄露用户也看不到,所以就不进行监测
if (!needReport()) {
// 清除ReferenceQueue
clearRefQueue();
return;
}
// Detect and report previous leaks.
for (;;) {
// 遍历ReferenceQueue
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
if (ref == null) {
break;
}
// 判断有没有泄露
// 没有泄露就continue了
if (!ref.dispose()) {
continue;
}
// getReportAndClearRecords是获取内存泄露的堆栈系信息,TraceRecord继承了Throwable
String records = ref.getReportAndClearRecords();
if (reportedLeaks.add(records)) {
if (records.isEmpty()) {
reportUntracedLeak(resourceType);
} else {
reportTracedLeak(resourceType, records);
}
}
}
}
boolean dispose() {
clear();
// 判断列表中没有这个对象,如果存在,说明泄露了
return allLeaks.remove(this);
}
总结:
- 全样本还是抽样:PlatformDependent.threadLoadkRandom().nextInt(samplingInterval)
- 记录访问信息:new TraceRecord() :TraceRecord extends Throwable
- 级别/开关:io.netty.util.ResourceLeakDetector.Level
- 信息呈现:logger.error
- 触发汇报时机:AbstractByteBufAllocator#buffer():io.netty.ResourceLeakDetector#track
示例:用Netty内存泄露检测工具做检测
用Netty内存泄露检测工具做检测
方法:-Dio.netty.leakDetection.level=PARANOID
注意:
- 默认级别是SIMPLE,不是每次都检查
- GC后才有可能检测到
- 注意日志级别
- 上线前用最高级别,上线后用默认
在OrderServerProcessHandler中,创建ByteBuf,但是不回收,就会导致内存泄露
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
Operation operation = requestMessage.getMessageBody();
OperationResult operationResult = operation.execute();
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult);
ctx.writeAndFlush(responseMessage);
}
}
同时将logback.xml的日志级别调整为ERROR,避免其他日志造成干扰
<?xml version="1.0" encoding="GBK"?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-4relative [%thread][%highlight(%-5level)][%logger:%L] %msg%n</pattern>
<charset>GBK</charset>
</encoder>
</appender>
<logger name="root" level="ERROR">
<appender-ref ref="console"/>
</logger>
</configuration>
让client发送10000次数据
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
for (int i = 0; i < 10000; i++) {
ChannelFuture channelFuture1 = channelFuture.channel().writeAndFlush(requestMessage);
}
channelFuture.channel().closeFuture().get();
在server的启动参数中增加 -Dio.netty.leakDetection.level=PARANOID
然后启动server,再启动client,然后就可以server的控制台中看到内存泄露日志
2022-05-20 17:58:19.706 5843 [worker-3-1][ERROR][io.netty.util.ResourceLeakDetector:319] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records:
Created at:
io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:174)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:108)
org.example.server.codec.handler.OrderServerProcessHandler.channelRead0(OrderServerProcessHandler.java:14)
org.example.server.codec.handler.OrderServerProcessHandler.channelRead0(OrderServerProcessHandler.java:11)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
java.lang.Thread.run(Thread.java:750)
Netty自带注解
@Sharable
标识handler,提醒可共享,不标记共享的不能重复加入pipeline
不带该注解的handler如果共享使用,第一次请求的客户端会正常访问,第二次请求的客户端则会被断开连接,同时server端也会抛出异常
例如:将之前的MetricsHandler上的@ChannelHandler.Sharable注解去掉,然后启动server,此时在分别启动client和clien1
client:
client1:
server:
查看堆栈信息的最后一个io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600)
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
// 不是sharable并且被add过了,isSharable()方法是判断是否有@Sharable注解
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129)
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 第129行
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
if (!ctx.isRemoved()) {
ctx.pipeline().remove(this);
}
}
return true;
}
return false;
}
initChannel(© ctx.channel());方法抛出了异常,被捕获后执行了exceptionCaught()方法
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (logger.isWarnEnabled()) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
}
ctx.close();
}
可以看到这里channel就被关闭了
所以这个注解不仅仅是一个标识,也是一种保护,防止不需要被共享的handler被共享
@Skip
跳过handler的执行
查看pipeline.addLast()方法,在最底层的addLast方法中,通过newContext()方法创建了一个AbstractChannelHandlerContext
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
查看newContext()方法,executionMask就是执行资格的判断
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
// 执行资格的判断
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
查看mask方法
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
....
}
判断是否跳过
// 判断有没有方法,有的话,判断是否标记了@Skip注解
private static boolean isSkippable(
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
@Override
public Boolean run() throws Exception {
Method m;
try {
m = handlerType.getMethod(methodName, paramTypes);
} catch (NoSuchMethodException e) {
if (logger.isDebugEnabled()) {
logger.debug(
"Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
}
return false;
}
return m.isAnnotationPresent(Skip.class);
}
});
}
在netty4中,@Skip并未开放给用户使用
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface Skip {
// no value
}
final class ChannelHandlerMask {
...
}
仅仅是包可见的
Netty5开放了@Skip注解
@UnstableApi
提醒被标记的对象不稳定,慎用
@SuppressJava6Requirement
去除“Java6需求”的报警
比如Netty编译出的jar想在java6的版本上运行,就需要找出使用jdk6以上的功能,这些功能就不能使用了,所以需要用插件扫描出使用了java6以上特性的api。
比如代码中判断了jdk的版本,比如jdk版本大于等于7的时候调用DnsResolveContextException(String message, boolean shared)方法,但是插件在扫描时还是会将该调用报警出来,所以要用注解标识这个方法不用报警
static final class DnsResolveContextException extends RuntimeException {
private static final long serialVersionUID = 1209303419266433003L;
private DnsResolveContextException(String message) {
super(message);
}
@SuppressJava6Requirement(reason = "uses Java 7+ Exception.<init>(String, Throwable, boolean, boolean)" +
" but is guarded by version checks")
private DnsResolveContextException(String message, boolean shared) {
super(message, null, false, true);
assert shared;
}
// Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
// Classloader.
@Override
public Throwable fillInStackTrace() {
return this;
}
static DnsResolveContextException newStatic(String message, Class<?> clazz, String method) {
final DnsResolveContextException exception;
if (PlatformDependent.javaVersion() >= 7) {
exception = new DnsResolveContextException(message, true);
} else {
exception = new DnsResolveContextException(message);
}
return ThrowableUtil.unknownStackTrace(exception, clazz, method);
}
}
插件使用
插件地址:https://github.com/mojohaus/animal-sniffer
<plugin>
<group>org.codehaus.mojo</group>
<artifactId>animal-sinffer-maven-plugin</artifactId>
<version>1.16</version>
<configuration>
<signature>
<groupId>org.codehaus.mojo.signature</groupId>
<!-- 这里的java16表示jdk6 http://www.mojohaus.org/signatures/-->
<artifactId>java16</artifactId>
<version>1.1</version>
</signature>
<ignores>
<ignore>java.nio.ByteBuffer</ignore>
</ignores>
<annotations>
<!-- 过滤掉的注解标识 -->
<annotation>io.netty.util.internal.SuppressJava6Requirement</annotation>
</annotations>
</configuration>
<executions>
<execution>
<!-- 处理阶段 process-classes,在compile之后,执行check -->
<phase>process-classes</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
@SuppresForbidden
去掉”禁用“报警
io.netty.util.NettyRuntime.AvailableProcessorsHolder#availableProcessors
计算CPU核数时,正常是调用Runtime.getRuntime().availableProcessors(),对于JDK10以下,使用docker,并且对docker进行了CPU限制的时候,这时候取出的数值就不准了,是宿主机的CPU核数,这时用户就可以通过设置io.netty.availableProcessors
参数来指定正确的CPU核数
/**
* Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}.
* This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
* {@link #setAvailableProcessors(int)} before any calls to this method.
*
* @return the configured number of available processors
*/
@SuppressForbidden(reason = "to obtain default number of available processors")
synchronized int availableProcessors() {
if (this.availableProcessors == 0) {
final int availableProcessors =
SystemPropertyUtil.getInt(
"io.netty.availableProcessors",
Runtime.getRuntime().availableProcessors());
setAvailableProcessors(availableProcessors);
}
return this.availableProcessors;
}
}
如果用户不知道有availableProcessors这个方法,他去调用Runtime.getRuntime().availableProcessors(),这个时候就可能出现问题,这个时候就需要用插件去扫描避免这个错误,插件可以扫描到这个错误,但是也会将availableProcessors()方法报警出来,所以需要@SuppressForbidden这个注解,让availableProcessors()方法不被报警。
插件使用
插件地址:https://github.com/policeman-tools/forbidden-apis
<plugin>
<groupId>de.thetaphi</groupId>
<artifactId>forbiddenapis</artifactId>
<version>2.2</version>
<executions>
<execution>
<id>check-forbidden-apis</id>
<configuration>
<targetVersion>${maven.compiler.target}</targetVersion>
<!-- allow undocumented classes like sun.misc.Unsafe: -->
<internalRuntimeForbidden>false</internalRuntimeForbidden>
<!-- if the used Java version is too new, don't fail, just do nothing: -->
<failOnUnsupportedJava>false</failOnUnsupportedJava>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->
<!-- enabling these should be done in the future -->
<!-- bundledSignature>jdk-unsafe</bundledSignature -->
<!-- bundledSignature>jdk-deprecated</bundledSignature -->
<!-- bundledSignature>jdk-system-out</bundledSignature -->
</bundledSignatures>
<signaturesFiles>
<!-- 文件中记录不应该调用什么,而应该调用另外一个 -->
<signaturesFile>${netty.dev.tools.directory}/forbidden/signatures.txt</signaturesFile>
</signaturesFiles>
<suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
</configuration>
<!-- 执行阶段为编译阶段 -->
<phase>compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>check-forbidden-test-apis</id>
<configuration>
<targetVersion>${maven.compiler.target}</targetVersion>
<!-- allow undocumented classes like sun.misc.Unsafe: -->
<internalRuntimeForbidden>true</internalRuntimeForbidden>
<!-- if the used Java version is too new, don't fail, just do nothing: -->
<failOnUnsupportedJava>false</failOnUnsupportedJava>
<bundledSignatures>
<!-- This will automatically choose the right signatures based on 'targetVersion': -->
<!-- enabling these should be done in the future -->
<!-- bundledSignature>jdk-unsafe</bundledSignature -->
<!-- bundledSignature>jdk-deprecated</bundledSignature -->
</bundledSignatures>
<signaturesFiles>
<signaturesFile>${netty.dev.tools.directory}/forbidden/signatures.txt</signaturesFile>
</signaturesFiles>
<!-- 报警会去除掉标记了指定注解的方法 -->
<suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
</configuration>
<phase>test-compile</phase>
<goals>
<goal>testCheck</goal>
</goals>
</execution>
</executions>
</plugin>
signatures.txt
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
// 不要用前面的方法,而是用use后的方法
java.lang.Runtime#availableProcessors() @ use NettyRuntime#availableProcessors()