Netty(5)跟踪诊断

young 1,625 2022-05-22

跟踪诊断

让应用易诊断

完善线程名

netty诊断-1

nioEvemtLoopGroup-2-1:代表bossGroup

nioEventLoopGroup-3-1:带表workerGroup

如果netty改变了实现,这个命名可能也会发送变化

在声明NioEventLoopGroup时添加DefaultThreadFactory,给线程命名

NioEventLoopGroup boss = new NioEventLoopGroup(0, new DefaultThreadFactory("boss"));
NioEventLoopGroup worker = new NioEventLoopGroup(0, new DefaultThreadFactory("worker"));
serverBootstrap.group(boss,worker);

NettyThreadRename

完善handler名称

netty诊断-2

pipeline中有多个handler,handler经常会加上#0的标记,甚至出现一个$1,$1是一个匿名内部类,#0是为了防止一个piple中加入了多个handler。

在pipeline添加handler时声明名称即可

pipeline.addLast("frameDecode",new OrderFrameDecoder());

HandlerRename

Netty日志的原理及使用

Netty日志框架原理

进入LoggingHandler类,查看构造器

public LoggingHandler(LogLevel level) {
    this(level, ByteBufFormat.HEX_DUMP);
}
public LoggingHandler(LogLevel level, ByteBufFormat byteBufFormat) {
    this.level = ObjectUtil.checkNotNull(level, "level");
    this.byteBufFormat = ObjectUtil.checkNotNull(byteBufFormat, "byteBufFormat");
    logger = InternalLoggerFactory.getInstance(getClass());
    internalLevel = level.toInternalLevel();
}

InternalLoggerFactory.getInstance(getClass());生成了一个logger实例,查看getInstance()方法

/**
 * Creates a new logger instance with the name of the specified class.
 */
public static InternalLogger getInstance(Class<?> clazz) {
    return getInstance(clazz.getName());
}

/**
 * Creates a new logger instance with the specified name.
 */
public static InternalLogger getInstance(String name) {
    return getDefaultFactory().newInstance(name);
}

可以看到是从一个工厂创建了实例

public static InternalLoggerFactory getDefaultFactory() {
    if (defaultFactory == null) {
        defaultFactory = newDefaultFactory(InternalLoggerFactory.class.getName());
    }
    return defaultFactory;
}
private static InternalLoggerFactory newDefaultFactory(String name) {
    InternalLoggerFactory f = useSlf4JLoggerFactory(name);
    if (f != null) {
        return f;
    }

    f = useLog4J2LoggerFactory(name);
    if (f != null) {
        return f;
    }

    f = useLog4JLoggerFactory(name);
    if (f != null) {
        return f;
    }

    return useJdkLoggerFactory(name);
}

可以看到,获取LoggerFactory的顺序是slf4j–>log4j2–>log4j–>jdk

修改JDK logger级别

在jre的lib目录下,有一个logging.properties文件,里面设置日志级别,jdk的日志级别没有DEBUG关键字,可以用FINE

使用slf4j+log4j

只需要在项目中引入相关的jar包即可

比如引入slf4j的依赖

<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.9</version>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.36</version>
</dependency>
<dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.2.9</version>
</dependency>

衡量logging handler的位置及级别

pipeline.addLast("frameDecode",new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());

这里LoggingHandler拿到的数据已经是被解析成对象的数据了,如果想debug出原始的数据,可以将LoggingHandler移到最上面

pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("frameDecode",new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(new OrderServerProcessHandler());

此时日志中就能打印出原始的数据了

13:13:00.690 [worker-3-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@26a0879c
13:13:00.693 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] READ: 49B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 2f 00 00 00 01 00 00 00 00 00 00 00 01 00 00 |./..............|
|00000010| 00 03 7b 22 74 61 62 6c 65 49 64 22 3a 31 30 30 |..{"tableId":100|
|00000020| 31 2c 22 64 69 73 68 22 3a 22 74 75 64 6f 75 22 |1,"dish":"tudou"|
|00000030| 7d                                              |}               |
+--------+-------------------------------------------------+----------------+
13:13:00.713 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] READ: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperation(tableId=1001, dish=tudou))
13:13:00.723 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: Message(messageHeader=MessageHeader(version=1, opCode=3, streamId=1), messageBody=OrderOperationResult(tableId=1001, dish=tudou, complete=true))
13:13:00.727 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: 2B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 3f                                           |.?              |
+--------+-------------------------------------------------+----------------+
13:13:00.728 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] WRITE: 63B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 01 00 00 00 00 00 00 00 01 00 00 00 03 |................|
|00000010| 7b 22 74 61 62 6c 65 49 64 22 3a 31 30 30 31 2c |{"tableId":1001,|
|00000020| 22 64 69 73 68 22 3a 22 74 75 64 6f 75 22 2c 22 |"dish":"tudou","|
|00000030| 63 6f 6d 70 6c 65 74 65 22 3a 74 72 75 65 7d    |complete":true} |
+--------+-------------------------------------------------+----------------+
13:13:00.728 [worker-3-1] INFO io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] FLUSH
13:13:00.728 [worker-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xb504643d, L:/127.0.0.1:8090 - R:/127.0.0.1:59652] FLUSH

应用可视化

Netty可视化案例演示:

  • 实现一个小目标:统计并展示当前系统连接数
    • Console日志定时输出
    • JMX实时展示
    • ELKK、TIG、etc

代码示例

创建handler

创建一个用于监控的Handler:MetricHandler,因为是统计当前系统的连接数,所以需要对开启连接和关闭连接都进行处理,因此MetricHandler继承ChannelDuplexHandler,ChannelDuplexHandler既支持输入也支持输出,同时重写channelActive和channelInactive方法

// ChannelDuplexHandler 既支持输入也支持输出
public class MetricsHandler extends ChannelDuplexHandler {
  	// 用于统计连接的计数器
    private AtomicLong totalConnectionNumber = new AtomicLong();
    
    // 创建连接时的操作
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.incrementAndGet();
        super.channelActive(ctx);
    }
    // 断开连接时的操作
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.decrementAndGet();
        super.channelInactive(ctx);
    }
}

此时连接数的统计就写好了,接着需要将统计的数据展示出来

引入metrics依赖

此处使用dropwizard的metrics

引入相关依赖:

<!-- 专门做度量用的 导出到core中有导出console的功能,导出到jmx要用jmx的包 -->
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-core</artifactId>
    <version>4.1.12.1</version>
</dependency>
<dependency>
    <groupId>io.dropwizard.metrics</groupId>
    <artifactId>metrics-jmx</artifactId>
    <version>4.1.12.1</version>
</dependency>

将我们统计的连接数进行注册和展示

// ChannelDuplexHandler 既支持输入也支持输出
// MetricHandler是可以共享的,所以可以加上一个注解@ChannelHandler.sharable
@ChannelHandler.Sharable
public class MetricsHandler extends ChannelDuplexHandler {
    private AtomicLong totalConnectionNumber = new AtomicLong();
    {
        // MetricRegistry 入口
        MetricRegistry metricRegistry = new MetricRegistry();
        // 注册 totalConnectionNumber
        metricRegistry.register("totalConnectionNumber", new Gauge<Long>() {
            @Override
            public Long getValue() {
                return totalConnectionNumber.longValue();
            }
        });
        // console的方式展示
        ConsoleReporter consoleReporter = ConsoleReporter.forRegistry(metricRegistry).build();
        // 5秒一次
        consoleReporter.start(5, TimeUnit.SECONDS);

        // Jmx的方式展示
        JmxReporter jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
        // 实时的,只要启动起来就行
        jmxReporter.start();

    }
    // 创建连接时的操作
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.incrementAndGet();
        super.channelActive(ctx);
    }
    // 断开连接时的操作
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        totalConnectionNumber.decrementAndGet();
        super.channelInactive(ctx);
    }
}

添加pipeline

将handler加入到pipeline中

// 共享的handler,所以放在外部声明
MetricsHandler metricsHandler = new MetricsHandler();
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
        pipeline.addLast("frameDecode",new OrderFrameDecoder());
        pipeline.addLast(new OrderProtocolDecoder());
        pipeline.addLast(new OrderFrameEncoder());
        pipeline.addLast(new OrderProtocolEncoder());
        pipeline.addLast("metricsHandler", metricsHandler);
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
        pipeline.addLast(new OrderServerProcessHandler());
    }
});

启动Server,就能在控制台看到相关输出,此时如果启动client,就能看到数据的变化

-- Gauges ----------------------------------------------------------------------
totalConnectionNumber
             value = 0

打开jconsole,连接到server之后,进入mBean就能看到相关的数据

Netty值得可视化的数据

外在数据

可视化信息 来源 备注
连接信息统计 channelActive/channelInactive
收数据统计 channelRead
发数据统计 write 这个wirte只是写到buffer中,ctx.write(msg).addListener()更准确
异常统计 exceptionCaught/ChannelFuture ReadTimeoutException.INSTANCE

内在数据

可视化信息 来源 备注
线程数 根据不同实现计算 例如:nioEventLoopGroup.executorCount();
待处理任务 executor.pendingTask() 例如:NioEventLoop的待处理任务
积累的数据 channelOutboundBuffer.totalPendingSize() Channel(Connection)级别,不是整个系统的
可写状态切换 channelWritabilityChanged
触发事件统计 userEventTriggered IdleStateEvent
ByteBuf分配细节 Pooled/UnpooledByteBufAllocator.DEFAULT.metric()

netty诊断-3

让应用内存不泄露

Netty内存泄露指什么

原因:忘记release

ByteBuf buffer = ctx.alloc().buffer();

~~buffer.release()/ReferenceCountUtil.release(buffer)~~

后果:资源未释放 -->OOM

  • 堆外:未free (PlatformDependent.freeDirectBuffer(buffer))
  • 池化:未归还 (recyclerHandler.recycle(this))

Netty内存泄露检测核心思路

引用计数(buffer.refCnt())+弱引用(Week reference)

引用计数:

  • 判断历史人物到底功大于过还是过大于功

    功 +1,过 -1,=0时,尘归尘土归土,资源就该释放了

    • 什么时候判断?“盖棺定论”时–>对象被GC后

强引用与弱引用

  • String 我是战斗英雄型强保镖 = new String(“我是主人”);这就是强引用

  • WeakReference<String> 我是爱写作的弱保镖 = new WeakRefrence<String>(new String(“我是主人”));

    只有一个爱写作的保镖(弱引用)守护(引用)时:刺客(GC)来袭,主人(referent)必挂(GC掉)

    不过主人挂掉(被GC掉)的时候,我还是可以发挥写作特长:把我自己记到“小本本(ReferenceQueue)”上去。

  • 有了ReferenceQueue,netty就可以知道对象是否被GC了

Netty内存检测的核心思路:

ByteBuf buffer = ctx.alloc().buffer() —>引用计数+1—>定义弱引用对象DefaultResourceLeak加到list(#allLeaks)里

buffer.release()—>引用计数-1—>减到0时,自动执行释放资源操作,并将弱引用对象从list里移除

判断依据:若引入对象在不在list里面?如果在,说明引用计数还没有到0—>没有到0,说明没有执行释放

判断实际:弱引用指向对象被回收时,可以把弱引用放进指定ReferenceQueue里面去,所以遍历queue拿出所有弱引用来判断

Netty内存泄露检测源码分析

ResourceLeakDetector

/**
 * Represents the level of resource leak detection.
 */
// level代表要不要开启内存泄露检测
public enum Level {
    /**
     * Disables resource leak detection.
     */
  	// 不开启
    DISABLED,
    /**
     * Enables simplistic sampling resource leak detection which reports there is a leak or not,
     * at the cost of small overhead (default).
     */
  	// 默认级别,只是告诉你有没有泄露,但是不会告诉你泄露的位置
    SIMPLE,
    /**
     * Enables advanced sampling resource leak detection which reports where the leaked object was accessed
     * recently at the cost of high overhead.
     */
    // 会记录泄露可能的位置,所以内存占用会比较大
    ADVANCED,
    /**
     * Enables paranoid resource leak detection which reports where the leaked object was accessed recently,
     * at the cost of the highest possible overhead (for testing purposes only).
     */
  	// 前面的级别只会跟踪一部分对象,PARANOID级别每个对象都会跟踪,也会记录可能泄露的地方
    PARANOID;

    /**
     * Returns level based on string value. Accepts also string that represents ordinal number of enum.
     *
     * @param levelStr - level string : DISABLED, SIMPLE, ADVANCED, PARANOID. Ignores case.
     * @return corresponding level or SIMPLE level in case of no match.
     */
    static Level parseLevel(String levelStr) {
        String trimmedLevelStr = levelStr.trim();
        for (Level l : values()) {
            if (trimmedLevelStr.equalsIgnoreCase(l.name()) || trimmedLevelStr.equals(String.valueOf(l.ordinal()))) {
                return l;
            }
        }
        return DEFAULT_LEVEL;
    }
}
/**
 * Creates a new {@link ResourceLeakTracker} which is expected to be closed via
 * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated.
 *
 * @return the {@link ResourceLeakTracker} or {@code null}
 */

// track方法相当于是一个入口,在new一个ByteBuf的时候,就会调用track方法
@SuppressWarnings("unchecked")
public final ResourceLeakTracker<T> track(T obj) {
    return track0(obj);
}

@SuppressWarnings("unchecked")
private DefaultResourceLeak track0(T obj) {
    Level level = ResourceLeakDetector.level;
  	// 判断级别,如果是DISABLED,就直接返回
    if (level == Level.DISABLED) {
        return null;
    }
		// 如果级别小于PARANOID,就不是每次都track了,而且按照一定的频率
    if (level.ordinal() < Level.PARANOID.ordinal()) {
      	// samplingInterval默认是128,取随机数,看随机数是否等于0
        if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            reportLeak();
          	// 创建弱引用
          	// private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak
            return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));
        }
        return null;
    }
    reportLeak();
    return new DefaultResourceLeak(obj, refQueue, allLeaks, getInitialHint(resourceType));
}
DefaultResourceLeak(
        Object referent,
        ReferenceQueue<Object> refQueue,
        Set<DefaultResourceLeak<?>> allLeaks,
        Object initialHint) {
    super(referent, refQueue);

    assert referent != null;

    // Store the hash of the tracked object to later assert it in the close(...) method.
    // It's important that we not store a reference to the referent as this would disallow it from
    // be collected via the WeakReference.
    trackedHash = System.identityHashCode(referent);
  	// 创建弱引用对象的时候,会把这个对象加入到allLeaks
    allLeaks.add(this);
    // Create a new Record so we always have the creation stacktrace included.
    headUpdater.set(this, initialHint == null ?
            new TraceRecord(TraceRecord.BOTTOM) : new TraceRecord(TraceRecord.BOTTOM, initialHint));
    this.allLeaks = allLeaks;
}
@Override
public boolean close() {
  	// close的时候会将当前对象移除出allLeaks
    if (allLeaks.remove(this)) {
        // Call clear so the reference is not even enqueued.
        clear();
        headUpdater.set(this, null);
        return true;
    }
    return false;
}
private void reportLeak() {
  	// 判断logging的error级别是否打开,如果没打开,检测到了内存泄露用户也看不到,所以就不进行监测
    if (!needReport()) {
      	// 清除ReferenceQueue
        clearRefQueue();
        return;
    }

    // Detect and report previous leaks.
    for (;;) {
      	// 遍历ReferenceQueue
        DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
        if (ref == null) {
            break;
        }
				// 判断有没有泄露
      	// 没有泄露就continue了
        if (!ref.dispose()) {
            continue;
        }
				// getReportAndClearRecords是获取内存泄露的堆栈系信息,TraceRecord继承了Throwable
        String records = ref.getReportAndClearRecords();
        if (reportedLeaks.add(records)) {
            if (records.isEmpty()) {
                reportUntracedLeak(resourceType);
            } else {
                reportTracedLeak(resourceType, records);
            }
        }
    }
}
boolean dispose() {
    clear();
  	// 判断列表中没有这个对象,如果存在,说明泄露了
    return allLeaks.remove(this);
}

总结:

  • 全样本还是抽样:PlatformDependent.threadLoadkRandom().nextInt(samplingInterval)
  • 记录访问信息:new TraceRecord() :TraceRecord extends Throwable
  • 级别/开关:io.netty.util.ResourceLeakDetector.Level
  • 信息呈现:logger.error
  • 触发汇报时机:AbstractByteBufAllocator#buffer():io.netty.ResourceLeakDetector#track

示例:用Netty内存泄露检测工具做检测

用Netty内存泄露检测工具做检测

方法:-Dio.netty.leakDetection.level=PARANOID

注意:

  • 默认级别是SIMPLE,不是每次都检查
  • GC后才有可能检测到
  • 注意日志级别
  • 上线前用最高级别,上线后用默认

在OrderServerProcessHandler中,创建ByteBuf,但是不回收,就会导致内存泄露

public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
        ByteBuf buffer = ctx.alloc().buffer();
        Operation operation = requestMessage.getMessageBody();
        OperationResult operationResult = operation.execute();

        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
        responseMessage.setMessageBody(operationResult);

        ctx.writeAndFlush(responseMessage);
    }
}

同时将logback.xml的日志级别调整为ERROR,避免其他日志造成干扰

<?xml version="1.0" encoding="GBK"?>
<configuration>
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-4relative [%thread][%highlight(%-5level)][%logger:%L] %msg%n</pattern>
            <charset>GBK</charset>
        </encoder>
    </appender>

    <logger name="root" level="ERROR">
        <appender-ref ref="console"/>
    </logger>
</configuration>

让client发送10000次数据

RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
for (int i = 0; i < 10000; i++) {
    ChannelFuture channelFuture1 = channelFuture.channel().writeAndFlush(requestMessage);
}
channelFuture.channel().closeFuture().get();

在server的启动参数中增加 -Dio.netty.leakDetection.level=PARANOID

然后启动server,再启动client,然后就可以server的控制台中看到内存泄露日志

2022-05-20 17:58:19.706 5843 [worker-3-1][ERROR][io.netty.util.ResourceLeakDetector:319] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
	io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:403)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
	io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:174)
	io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:108)
	org.example.server.codec.handler.OrderServerProcessHandler.channelRead0(OrderServerProcessHandler.java:14)
	org.example.server.codec.handler.OrderServerProcessHandler.channelRead0(OrderServerProcessHandler.java:11)
	io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	java.lang.Thread.run(Thread.java:750)

Netty自带注解

@Sharable

标识handler,提醒可共享,不标记共享的不能重复加入pipeline

不带该注解的handler如果共享使用,第一次请求的客户端会正常访问,第二次请求的客户端则会被断开连接,同时server端也会抛出异常

例如:将之前的MetricsHandler上的@ChannelHandler.Sharable注解去掉,然后启动server,此时在分别启动client和clien1

client:

sharable-client
client1:

sharable-client1
server:

sharable-server

查看堆栈信息的最后一个io.netty.channel.DefaultChannelPipeline.checkMultiplicity(DefaultChannelPipeline.java:600)

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
      	// 不是sharable并且被add过了,isSharable()方法是判断是否有@Sharable注解
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129)

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
          	// 第129行
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            if (!ctx.isRemoved()) {
                ctx.pipeline().remove(this);
            }
        }
        return true;
    }
    return false;
}

initChannel(© ctx.channel());方法抛出了异常,被捕获后执行了exceptionCaught()方法

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    if (logger.isWarnEnabled()) {
        logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), cause);
    }
    ctx.close();
}

可以看到这里channel就被关闭了

所以这个注解不仅仅是一个标识,也是一种保护,防止不需要被共享的handler被共享

@Skip

跳过handler的执行

查看pipeline.addLast()方法,在最底层的addLast方法中,通过newContext()方法创建了一个AbstractChannelHandlerContext

@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);

        newCtx = newContext(group, filterName(name, handler), handler);

        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventLoop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

查看newContext()方法,executionMask就是执行资格的判断

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
                              String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
  	// 执行资格的判断
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

查看mask方法

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
....
}

判断是否跳过

// 判断有没有方法,有的话,判断是否标记了@Skip注解
private static boolean isSkippable(
        final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
    return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
        @Override
        public Boolean run() throws Exception {
            Method m;
            try {
                m = handlerType.getMethod(methodName, paramTypes);
            } catch (NoSuchMethodException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug(
                        "Class {} missing method {}, assume we can not skip execution", handlerType, methodName, e);
                }
                return false;
            }
            return m.isAnnotationPresent(Skip.class);
        }
    });
}

在netty4中,@Skip并未开放给用户使用

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@interface Skip {
    // no value
}
final class ChannelHandlerMask {
	...
}

仅仅是包可见的

Netty5开放了@Skip注解

@UnstableApi

提醒被标记的对象不稳定,慎用

@SuppressJava6Requirement

去除“Java6需求”的报警

比如Netty编译出的jar想在java6的版本上运行,就需要找出使用jdk6以上的功能,这些功能就不能使用了,所以需要用插件扫描出使用了java6以上特性的api。

比如代码中判断了jdk的版本,比如jdk版本大于等于7的时候调用DnsResolveContextException(String message, boolean shared)方法,但是插件在扫描时还是会将该调用报警出来,所以要用注解标识这个方法不用报警

static final class DnsResolveContextException extends RuntimeException {

    private static final long serialVersionUID = 1209303419266433003L;

    private DnsResolveContextException(String message) {
        super(message);
    }

    @SuppressJava6Requirement(reason = "uses Java 7+ Exception.<init>(String, Throwable, boolean, boolean)" +
            " but is guarded by version checks")
    private DnsResolveContextException(String message, boolean shared) {
        super(message, null, false, true);
        assert shared;
    }

    // Override fillInStackTrace() so we not populate the backtrace via a native call and so leak the
    // Classloader.
    @Override
    public Throwable fillInStackTrace() {
        return this;
    }

    static DnsResolveContextException newStatic(String message, Class<?> clazz, String method) {
        final DnsResolveContextException exception;
        if (PlatformDependent.javaVersion() >= 7) {
            exception = new DnsResolveContextException(message, true);
        } else {
            exception = new DnsResolveContextException(message);
        }
        return ThrowableUtil.unknownStackTrace(exception, clazz, method);
    }
}

插件使用

插件地址:https://github.com/mojohaus/animal-sniffer

<plugin>
	<group>org.codehaus.mojo</group>
  <artifactId>animal-sinffer-maven-plugin</artifactId>
  <version>1.16</version>
  <configuration>
  	<signature>
    	<groupId>org.codehaus.mojo.signature</groupId>
      <!-- 这里的java16表示jdk6 http://www.mojohaus.org/signatures/-->
      <artifactId>java16</artifactId>
      <version>1.1</version>
    </signature>
    <ignores>
      <ignore>java.nio.ByteBuffer</ignore>
    </ignores>
    <annotations>
      <!-- 过滤掉的注解标识 -->
    	<annotation>io.netty.util.internal.SuppressJava6Requirement</annotation>
    </annotations>
  </configuration>
  <executions>
  	<execution>
      <!-- 处理阶段 process-classes,在compile之后,执行check -->
    	<phase>process-classes</phase>
      <goals>
      	<goal>check</goal>
      </goals>
    </execution>
  </executions>
</plugin>

SuppressJava6Requirement

@SuppresForbidden

去掉”禁用“报警

io.netty.util.NettyRuntime.AvailableProcessorsHolder#availableProcessors

计算CPU核数时,正常是调用Runtime.getRuntime().availableProcessors(),对于JDK10以下,使用docker,并且对docker进行了CPU限制的时候,这时候取出的数值就不准了,是宿主机的CPU核数,这时用户就可以通过设置io.netty.availableProcessors参数来指定正确的CPU核数

    /**
     * Get the configured number of available processors. The default is {@link Runtime#availableProcessors()}.
     * This can be overridden by setting the system property "io.netty.availableProcessors" or by invoking
     * {@link #setAvailableProcessors(int)} before any calls to this method.
     *
     * @return the configured number of available processors
     */
    @SuppressForbidden(reason = "to obtain default number of available processors")
    synchronized int availableProcessors() {
        if (this.availableProcessors == 0) {
            final int availableProcessors =
                    SystemPropertyUtil.getInt(
                            "io.netty.availableProcessors",
                            Runtime.getRuntime().availableProcessors());
            setAvailableProcessors(availableProcessors);
        }
        return this.availableProcessors;
    }
}

如果用户不知道有availableProcessors这个方法,他去调用Runtime.getRuntime().availableProcessors(),这个时候就可能出现问题,这个时候就需要用插件去扫描避免这个错误,插件可以扫描到这个错误,但是也会将availableProcessors()方法报警出来,所以需要@SuppressForbidden这个注解,让availableProcessors()方法不被报警。

插件使用

插件地址:https://github.com/policeman-tools/forbidden-apis

<plugin>
  <groupId>de.thetaphi</groupId>
  <artifactId>forbiddenapis</artifactId>
  <version>2.2</version>
  <executions>
    <execution>
      <id>check-forbidden-apis</id>
      <configuration>
        <targetVersion>${maven.compiler.target}</targetVersion>
        <!-- allow undocumented classes like sun.misc.Unsafe: -->
        <internalRuntimeForbidden>false</internalRuntimeForbidden>
        <!-- if the used Java version is too new, don't fail, just do nothing: -->
        <failOnUnsupportedJava>false</failOnUnsupportedJava>
        <bundledSignatures>
          <!-- This will automatically choose the right signatures based on 'targetVersion': -->
          <!-- enabling these should be done in the future -->
          <!-- bundledSignature>jdk-unsafe</bundledSignature -->
          <!-- bundledSignature>jdk-deprecated</bundledSignature -->
          <!-- bundledSignature>jdk-system-out</bundledSignature -->
        </bundledSignatures>
        <signaturesFiles>
          <!-- 文件中记录不应该调用什么,而应该调用另外一个 -->
          <signaturesFile>${netty.dev.tools.directory}/forbidden/signatures.txt</signaturesFile>
        </signaturesFiles>
        <suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
      </configuration>
      <!-- 执行阶段为编译阶段 -->
      <phase>compile</phase>
      <goals>
        <goal>check</goal>
      </goals>
    </execution>
    <execution>
      <id>check-forbidden-test-apis</id>
      <configuration>
        <targetVersion>${maven.compiler.target}</targetVersion>
        <!-- allow undocumented classes like sun.misc.Unsafe: -->
        <internalRuntimeForbidden>true</internalRuntimeForbidden>
        <!-- if the used Java version is too new, don't fail, just do nothing: -->
        <failOnUnsupportedJava>false</failOnUnsupportedJava>
        <bundledSignatures>
          <!-- This will automatically choose the right signatures based on 'targetVersion': -->
          <!-- enabling these should be done in the future -->
          <!-- bundledSignature>jdk-unsafe</bundledSignature -->
          <!-- bundledSignature>jdk-deprecated</bundledSignature -->
        </bundledSignatures>
        <signaturesFiles>
          <signaturesFile>${netty.dev.tools.directory}/forbidden/signatures.txt</signaturesFile>
        </signaturesFiles>
        <!-- 报警会去除掉标记了指定注解的方法 -->
        <suppressAnnotations><annotation>**.SuppressForbidden</annotation></suppressAnnotations>
      </configuration>
      <phase>test-compile</phase>
      <goals>
        <goal>testCheck</goal>
      </goals>
    </execution>
  </executions>
</plugin>

signatures.txt

/*
 * Copyright 2017 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
// 不要用前面的方法,而是用use后的方法
java.lang.Runtime#availableProcessors() @ use NettyRuntime#availableProcessors()

SuppresForbidden