Netty(6)优化使用

young 1,002 2022-05-22

优化使用

整改线程模型

  • 业务的常用两种场景
    • CPU密集型:运算型
    • IO密集型:等待型

CPU密集型

  • 保持当前线程模型(复用NioEventLoop里面绑定的线程)

    线程数计算:

    • Runtime.getRuntime().avaliableProcessors() * 2 当前机器CPU数*2
    • io.netty.avaliableProcessors * 2 考虑到docker的情况,使用自己指定的CPU数 *2
    • io,netty.eventLoopThreads 部分调优场景直接设置

IO密集型

  • 整改线程模型:独立出”线程池“来处理业务,不再和NioEventLoop共享

    IO密集型,业务处理经常要等待很长时间,如果不独立出来,就会抢占IO事件的处理,如果IO事件处理不及时的话,效率就会大打折扣

    • 在handler内部使用JDK Executors

    • 添加handler时,指定一个:

      EventExecutorGroup eventExecutorGroup = new UnorderedThreadEventExecutor(10);

      pipeline.addLast(eventExecutorGroup,serverHandler);

代码示例

修改OrderOperation的execute方法,模拟其耗时比较久

@Override
public OperationResult execute() {
    log.info("order's executing startup with orderRequest: {}" , this.toString());
  	// sleep 3秒钟
    Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
    log.info("order's executing complete");
    OrderOperationResult result = new OrderOperationResult(tableId,dish,true);
    return result;
}

修改server代码,在childHandler方法外创建UnorderedThreadPoolEventExecutor,并命名为business,将其配置到pipeline中

MetricsHandler metricsHandler = new MetricsHandler();
// 配置在childHandler方法外,让childHandler中OrderServerProcessHandler共享该线程池
UnorderedThreadPoolEventExecutor business = new UnorderedThreadPoolEventExecutor(10,
        new DefaultThreadFactory("business"));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
        pipeline.addLast("frameDecode", new OrderFrameDecoder());
        pipeline.addLast(new OrderProtocolDecoder());
        pipeline.addLast(new OrderFrameEncoder());
        pipeline.addLast(new OrderProtocolEncoder());
        pipeline.addLast("metricsHandler", metricsHandler);
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
      	// 添加到pipeline中
        pipeline.addLast(business, new OrderServerProcessHandler());
    }
});

这样一来,所有OrderServerProcessHandler处理时都需要耗时3秒钟,并且处理时都是用名为business的线程池。

接着,修改Client类,模拟多次请求

for (int i = 0; i < 20; i++) {
    channelFuture.channel().writeAndFlush(requestMessage);
}

启动Server,然后启动Client,查看Server的日志

UnorderedThreadPoolEventExecutorLog

为什么不用NioEventLoopGroup

将Server代码中的UnorderedThreadPoolEventExecutor换成NioEventLoopGroup

NioEventLoopGroup eventExecutors = new NioEventLoopGroup(0, new DefaultThreadFactory("business"));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
        pipeline.addLast("frameDecode", new OrderFrameDecoder());
        pipeline.addLast(new OrderProtocolDecoder());
        pipeline.addLast(new OrderFrameEncoder());
        pipeline.addLast(new OrderProtocolEncoder());
        pipeline.addLast("metricsHandler", metricsHandler);
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
        pipeline.addLast(eventExecutors, new OrderServerProcessHandler());
    }
});

再次启动Server端及Client端,查看Server端的日志

NioEventLoopGroupLog

可以看到,它只是用了一个线程,处理很慢。

查看pipeline.addLast()方法中的newContext()方法

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
  // SINGLE_EVENTEXECUTOR_PER_GROUP:当增加一个handler并且指定EventExecutorGroup时:决定这个handler是否只用EventExecutorGroup中的一个固定的EventExecutor(取决于next()实现),默认为true
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    if (pinEventExecutor != null && !pinEventExecutor) {
        return group.next();
    }
    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    if (childExecutors == null) {
        // Use size of 4 as most people only use one extra EventExecutor.      	
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    // Pin one of the child executors once and remember it so that the same child executor
    // is used to fire events for the same channel.
  	// hashMap的key是group,值是next实现
    EventExecutor childExecutor = childExecutors.get(group);
    if (childExecutor == null) {
      // SINGLE_EVENTEXECUTOR_PER_GROUP 为true,就会共享唯一一个next返回的值
      // 对于NioEventLoopGroup而言,就是返回其中一个子元素
      // UnorderedThreadPoolEventExecutor的方法返回的是this,而不是ThreadPool中的某一个线程
        childExecutor = group.next();
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}

增强写、延迟与吞吐量的抉择

写的问题

public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
        Operation operation = requestMessage.getMessageBody();
        OperationResult operationResult = operation.execute();
        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
        responseMessage.setMessageBody(operationResult);

        ctx.writeAndFlush(responseMessage);
    }
}

使用ctx.writeAndFlush(),全部都是”加急式“快递。

不见得所以程序都需要如此,这种方式每次写数据都要flush,吞吐量不见得会很高。

改进方式1:channelReadComplete

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

在官方的例子中,将read分为了两步,在channelRead的时候执行ctx.write,在channelReadComplete的时候执行ctx.flush。

一个读事件的触发可能触发多次读,最多可能是16次,比如需要读3次,按照之前的写法,就会flush3次,而拆开后就只会flush1次。

缺点
  • 不适合异步业务线程(不复用NIO event lopp)处理,比如前面说到的IO密集型,独立出了一个线程池

    channelRead中的业务处理结果的write很可能发生在channelReadComplete之后

  • 不适合更精细的控制:例如连续读16次,第16次是flush,但是如果保持连续的次数不变还是16次,但是想读3次就flush,这样就做不到了

改进方式2:FlushConsolidationHandler

源码分析

io.netty.handler.flush.FlushConsolidationHandler#flush

//
// 同步:						read -> writeAndFlush -> readComplete
// 异步:						read -> readComplete  -> writeAndFlush
// readInProgress		000011111111111111111111110000000000000
//									000001111000000000000000000000000000000

@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
  	// 根据业务线程是否复用IO线程两种情况来考虑
  	// 复用情况
    if (readInProgress) {	// 正在读的时候
        // If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
        // we only need to flush if we reach the explicitFlushAfterFlushes limit.
      	// 每explicitFlushAfterFlushes个”批量“写(flush)一次
      	// 不足怎么办?channelReadComplete会flush掉后面的
        if (++flushPendingCount == explicitFlushAfterFlushes) {
            flushNow(ctx);
        }
      // 以下是非复用情况:异步情况
    } else if (consolidateWhenNoReadInProgress) {
      	// (业务异步化情况)开启consolidateWhenNoReadInProgress时,优化flush
      	// (比如没有读请求了,但是内部还是忙的团团转,没有消化的时候,所以还是会写响应)
        // Flush immediately if we reach the threshold, otherwise schedule
        if (++flushPendingCount == explicitFlushAfterFlushes) {
            flushNow(ctx);
        } else {
            scheduleFlush(ctx);
        }
    } else {
      	//(业务异步化情况下)没有开启consolidateWhenNoReadInProgress时,直接flush
        // Always flush directly
        flushNow(ctx);
    }
}

对于复用的情况,一开始会走到readInProgress的逻辑中,这就是正在读,有可能会读到3次数据或者10次数据,比如10次数据中,我们希望每5次执行一次flush,所以定义了一个参数explicitFlushAfterFlushes,如果定义了5次,但是读了6次,同步情况下,最终会调用readComplete,也就是最后会调用io.netty.handler.flush.FlushConsolidationHandler#channelReadComplete方法,将最后一次的数据flush出去。

如果readInProgress为false的时候,也就是不在读的时候,这种情况往往是异步的情况,异步情况默认是没有增强的,就直接flush了。

如果在异步的时候也想减少flush的次数,netty定义了consolidateWhenNoReadInProgress参数,当它为true时,就可以增强异步的情况。如果次数不到explicitFlushAfterFlushes,就执行scheduleFlush。

// 尽快,但是给个优化的机会
private void scheduleFlush(final ChannelHandlerContext ctx) {
    if (nextScheduledFlush == null) {
        // Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
        nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
    }
}

schedule就意味着这是一个task,有可能会延迟执行,延迟的过程中,又有了减少flush的机会

使用

直接在Server的pipeline中添加FlushConsolidationHandler

pipeline.addLast("flushEnhance", new FlushConsolidationHandler(5,true));
// explicitFlushAfterFlushes:多少次flush一次
// consolidateWhenNoReadInProgress:是否开启异步增强
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
  ...
}
public class FlushConsolidationHandler extends ChannelDuplexHandler {
  ...
}

FlushConsolidationHandler类上没有添加@Sharable注解,所以不能共享

流量整形

流量整形的用途

  • 网盘限速(有意)
  • 景点限流(无奈)

Netty内置的三种流量整形

Global级别:GlobalTrafficShapingHandler

Channel级别:ChannelTrafficShapingHandler

Global和channel都有两个参数,分别控制读和写的限流

Global+Channel:GlobalChannelTrafficShapingHandler

Gobal+Channel一共有4个参数

Netty流量整形的源码分析及总结

GlobalTrafficShapingHandler

读流控
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
  	// 如果TS的handler放错了位置,接收的不是byte buffer之类,则直接跳过
    long size = calculateSize(msg);
    long now = TrafficCounter.milliSecondFromNano();
  	// 当数据不是byte buffer时,size计算出是-1,所以不会走到流量整形里面,所以handler的位置很重要
    if (size > 0) {
        // compute the number of ms to wait before reopening the channel
      	// 计算需要等待多久,readLimit:限流大小;maxTime:最大等待时间,now:当前时间
        long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
      	// 检查等待时间,如果算出的最大时间大于设置的最大时间,就返回设置的最大时间
        wait = checkWaitReadTime(ctx, wait, now);
      	// 判断最小等待时间,如果等待时间太小,就意义不大了
      	// 如果等待时间大于最小等待时间,就继续处理
        if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
            // time in order to try to limit the traffic
            // Only AutoRead AND HandlerActive True means Context Active
            Channel channel = ctx.channel();
            ChannelConfig config = channel.config();
            if (logger.isDebugEnabled()) {
                logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
                        + isHandlerActive(ctx));
            }
            if (config.isAutoRead() && isHandlerActive(ctx)) {
              	// 设置autoRead为false,并且将读事件从Selector上移除,相当于把读暂停掉了
                config.setAutoRead(false);
                channel.attr(READ_SUSPENDED).set(true);
                // Create a Runnable to reactive the read if needed. If one was create before it will just be
                // reused to limit object creation
                Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
                Runnable reopenTask = attr.get();
                if (reopenTask == null) {
                  	// ReopenReadTimerTask 重新打开读的task
                  	// 核心是将autoRead设置为true,然后channelRead
                    reopenTask = new ReopenReadTimerTask(ctx);
                    attr.set(reopenTask);
                }
              	// 过wait时间后,重新打开读功能
                ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
                if (logger.isDebugEnabled()) {
                    logger.debug("Suspend final status => " + config.isAutoRead() + ':'
                            + isHandlerActive(ctx) + " will reopened at: " + wait);
                }
            }
        }
    }
    informReadOperation(ctx, now);
    ctx.fireChannelRead(msg);
}
protected long calculateSize(Object msg) {
    if (msg instanceof ByteBuf) {
        return ((ByteBuf) msg).readableBytes();
    }
    if (msg instanceof ByteBufHolder) {
        return ((ByteBufHolder) msg).content().readableBytes();
    }
    if (msg instanceof FileRegion) {
        return ((FileRegion) msg).count();
    }
    return -1;
}
@Override
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
    Integer key = ctx.channel().hashCode();
    PerChannel perChannel = channelQueues.get(key);
    if (perChannel != null) {
        if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
            wait = maxTime;
        }
    }
    return wait;
}
// io.netty.handler.traffic.AbstractTrafficShapingHandler.ReopenReadTimerTask#run
@Override
public void run() {
    Channel channel = ctx.channel();
    ChannelConfig config = channel.config();
    if (!config.isAutoRead() && isHandlerActive(ctx)) {
        // If AutoRead is False and Active is True, user make a direct setAutoRead(false)
        // Then Just reset the status
        if (logger.isDebugEnabled()) {
            logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
                    isHandlerActive(ctx));
        }
        channel.attr(READ_SUSPENDED).set(false);
    } else {
        // Anything else allows the handler to reset the AutoRead
        if (logger.isDebugEnabled()) {
            if (config.isAutoRead() && !isHandlerActive(ctx)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
                            isHandlerActive(ctx));
                }
            } else {
                if (logger.isDebugEnabled()) {
                    logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
                            + isHandlerActive(ctx));
                }
            }
        }
        channel.attr(READ_SUSPENDED).set(false);
      	// 将autoRead设置为true,然后channelRead
        config.setAutoRead(true);
        channel.read();
    }
    if (logger.isDebugEnabled()) {
        logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
                + isHandlerActive(ctx));
    }
}
写流控
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
        throws Exception {
  	// 计算size
    long size = calculateSize(msg);
    long now = TrafficCounter.milliSecondFromNano();
  	// size>0,需要写暂停
    if (size > 0) {
        // compute the number of ms to wait before continue with the channel
        long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
        if (wait >= MINIMAL_WAIT) {
            if (logger.isDebugEnabled()) {
                logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
                        + isHandlerActive(ctx));
            }
            submitWrite(ctx, msg, size, wait, now, promise);
            return;
        }
    }
    // to maintain order of write
  	// 注意这里的delay是0,表示不等待
    submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
        final long size, final long writedelay, final long now,
        final ChannelPromise promise) {
  	// 步骤1:根据channel的key,获取对应的存delay的queueu,没有则创建
    Channel channel = ctx.channel();
    Integer key = channel.hashCode();
    PerChannel perChannel = channelQueues.get(key);
    if (perChannel == null) {
        // in case write occurs before handlerAdded is raised for this handler
        // imply a synchronized only if needed
      	// 里面有一个ArrayDeque,用于存放写暂停时候的写数据
        perChannel = getOrSetPerChannel(ctx);
    }
    final ToSend newToSend;
    long delay = writedelay;
    boolean globalSizeExceeded = false;
    // write operations need synchronization
    synchronized (perChannel) {
      	// 步骤2:判断是否要delay,如果不需要且queue中无数据,则直接发送
      
      	// 如果queue有数据,即使不需要delay,也要将数据入queue,因为要保持顺序
        if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
            trafficCounter.bytesRealWriteFlowControl(size);
            ctx.write(msg, promise);
            perChannel.lastWriteTimestamp = now;
            return;
        }
      	// 步骤3: 预计delay时间过长,则最多等待15秒(maxTime的值)
        if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
            delay = maxTime;
        }
	      // 步骤4:数据进入queue
        newToSend = new ToSend(delay + now, msg, size, promise);
      	// 不管什么情况,都直接入queue,所以可能会OOM,所有后面要根据queue的情况,改变可写标记位
        perChannel.messagesQueue.addLast(newToSend);
        perChannel.queueSize += size;
      	// 上下2个queueSize不一样,上面少个s,代表channel上的queue,下面是global的
        queuesSize.addAndGet(size);
      	// 步骤5:判断是否queue的数据过多,如果是,设置写状态为不可写
      
      	// 判断channel的queue size是否超标,或者需要停的时间过长,设置writable为false,提醒让上面的handler不要写了
        checkWriteSuspend(ctx, delay, perChannel.queueSize);
      	// 判断global的queues(所有的queue加一起的结果)size超标
        if (queuesSize.get() > maxGlobalWriteSize) {
            globalSizeExceeded = true;
        }
    }
  	// 如果超标了,就会给userDefinedWritabilityIndex赋值,也就是将写变成不可写状态
    if (globalSizeExceeded) {
        setUserDefinedWritability(ctx, false);
    }
  	// 步骤6:开始schedule一个task来等待delay的时间再发送
    final long futureNow = newToSend.relativeTimeAction;
    final PerChannel forSchedule = perChannel;
    ctx.executor().schedule(new Runnable() {
        @Override
        public void run() {
            sendAllValid(ctx, forSchedule, futureNow);
        }
    }, delay, TimeUnit.MILLISECONDS);
}
void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
  	// 判断写暂停时的数据size是否大于最大的写size,或者等待时间超过了最大的写等待时间
  	// 可能会OOM了,或者需要等待的时间太长了,就不建议写了
  	// 类似于景点,发现排队时间过长,或者人满为患了,这个时间就出公告,不建议大家再捡来了
    if (queueSize > maxWriteSize || delay > maxWriteDelay) {
        setUserDefinedWritability(ctx, false);
    }
}
void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
    ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
    if (cob != null) {
        cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
    }
}
TrafficCounter
void createGlobalTrafficCounter(ScheduledExecutorService executor) {
    TrafficCounter tc = new TrafficCounter(this,
            ObjectUtil.checkNotNull(executor, "executor"),
            "GlobalTC",
            checkInterval);

    setTrafficCounter(tc);
  	// 创建之后直接start
    tc.start();
}
public synchronized void start() {
    if (monitorActive) {
        return;
    }
    lastTime.set(milliSecondFromNano());
    long localCheckInterval = checkInterval.get();
    // if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
    if (localCheckInterval > 0 && executor != null) {
        monitorActive = true;
        monitor = new TrafficMonitoringTask();
      	// start的时候,执行的是monitor,也就是TrafficMonitoringTask
      	// 执行周期为1秒,也就是流控是按照周期进行的,也就是窗口的模式
        scheduledFuture =
            executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
    }
}
private final class TrafficMonitoringTask implements Runnable {
    @Override
    public void run() {
        if (!monitorActive) {
            return;
        }
      	// 将account给Reset掉
        resetAccounting(milliSecondFromNano());
        if (trafficShapingHandler != null) {
            trafficShapingHandler.doAccounting(TrafficCounter.this);
        }
    }
}
synchronized void resetAccounting(long newLastTime) {
    long interval = newLastTime - lastTime.getAndSet(newLastTime);
    if (interval == 0) {
        // nothing to do
        return;
    }
    if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
        logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
    }
    lastReadBytes = currentReadBytes.getAndSet(0);
    lastWrittenBytes = currentWrittenBytes.getAndSet(0);
    lastReadThroughput = lastReadBytes * 1000 / interval;
    // nb byte / checkInterval in ms * 1000 (1s)
    lastWriteThroughput = lastWrittenBytes * 1000 / interval;
    // nb byte / checkInterval in ms * 1000 (1s)
    realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
    lastWritingTime = Math.max(lastWritingTime, writingTime);
    lastReadingTime = Math.max(lastReadingTime, readingTime);
}

总结:

  • 读写流控判断:按一定时间段checkInterval(1s)来统计。writeLimit/readLimit设置的值为0时,表示关闭写整形/读整形
  • 等待时间范围控制10ms(MINIMAL_WAIT)-> 15s(maxTime)
  • 读流控:取消读事件监听,当如缓存区满,然后对端写缓存区满,然后对端写不进去,对端对数据进行丢弃或减缓发送
  • 写流控:待发数据进去Queue。等待时间超过4s(maxWriteDelay)|| 单个channel缓存的数据超过4M(maxWriteSize)||所有缓存数据超过400M(maxGlobalWriteSize)时修改写状态为不可写

示例:流量整形的使用

  • ChannelTrafficShapingHandler
  • GlobalTrafficShapingHandler:share
  • GlobalChannelTrafficShapingHandler:share
// executor:用于周期性计算流控的统计数据
// checkInterval:周期时间
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
        long writeGlobalLimit, long readGlobalLimit,
        long writeChannelLimit, long readChannelLimit,
        long checkInterval, long maxTime) {
  ...
  ...
}
GlobalTrafficShapingHandler globalTrafficShapingHandler =
        new GlobalTrafficShapingHandler(new NioEventLoopGroup(), 100 * 1024 * 1024, 100 * 1024 * 1024);

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));

        pipeline.addLast("TShandler", globalTrafficShapingHandler);
      
      	...
});   

为不同平台开启native

如何开启Native

修改代码

  • NioServerSockerChannel -> [Prefix]ServerSocketChannel
  • NioEventLoopGroup -> [Prefix]EventLoopGroup

准备好native库:自己build/Netty jar中也自带了一些

例如将Server代码中的Nio字段全部替换为Epoll,然后重新导入包

此时,启动Server会抛出异常,加载native库失败,Epoll只能在Linux平台运行

NioReplaceToEpollErr

源码分析Native库加载逻辑

io.netty.channel.epoll.Epoll

static {
    Throwable cause = null;
		// 判断io.netty.transport.noNative来决定是否启用native
  	// 如果不启用,就直接抛出Exception
    if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
        cause = new UnsupportedOperationException(
                "Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
    } else {
        FileDescriptor epollFd = null;
        FileDescriptor eventFd = null;
        try {
          	// 触发native库的加载
            epollFd = Native.newEpollCreate();
            eventFd = Native.newEventFd();
        } catch (Throwable t) {
            cause = t;
        } finally {
            if (epollFd != null) {
                try {
                    epollFd.close();
                } catch (Exception ignore) {
                    // ignore
                }
            }
            if (eventFd != null) {
                try {
                    eventFd.close();
                } catch (Exception ignore) {
                    // ignore
                }
            }
        }
    }

    UNAVAILABILITY_CAUSE = cause;
}

io.netty.channel.epoll.Native

static {
    Selector selector = null;
    try {
        // We call Selector.open() as this will under the hood cause IOUtil to be loaded.
        // This is a workaround for a possible classloader deadlock that could happen otherwise:
        //
        // See https://github.com/netty/netty/issues/10187
        selector = Selector.open();
    } catch (IOException ignore) {
        // Just ignore
    }

    // Preload all classes that will be used in the OnLoad(...) function of JNI to eliminate the possiblity of a
    // class-loader deadlock. This is a workaround for https://github.com/netty/netty/issues/11209.

    // This needs to match all the classes that are loaded via NETTY_JNI_UTIL_LOAD_CLASS or looked up via
    // NETTY_JNI_UTIL_FIND_CLASS.
    ClassInitializerUtil.tryLoadClasses(Native.class,
            // netty_epoll_linuxsocket
            PeerCredentials.class, DefaultFileRegion.class, FileChannel.class, java.io.FileDescriptor.class,
            // netty_epoll_native
            NativeDatagramPacketArray.NativeDatagramPacket.class
    );

    try {
        // First, try calling a side-effect free JNI method to see if the library was already
        // loaded by the application.
      	// 调用一个轻量级操作测试下是否加载过了,如果没有报错,就说明加载过了,就不需要重复加载了
        offsetofEpollData();
    } catch (UnsatisfiedLinkError ignore) {
        // The library was not previously loaded, load it now.
      	// 加载native库
        loadNativeLibrary();
    } finally {
        try {
            if (selector != null) {
                selector.close();
            }
        } catch (IOException ignore) {
            // Just ignore
        }
    }
    Unix.registerInternal(new Runnable() {
        @Override
        public void run() {
            registerUnix();
        }
    });
}
private static void loadNativeLibrary() {
    String name = PlatformDependent.normalizedOs();
  	// 判断操作系统的名字
    if (!"linux".equals(name)) {
        throw new IllegalStateException("Only supported on Linux");
    }
  	// 拼接两种lib的名字,分别尝试加载
    String staticLibName = "netty_transport_native_epoll";
    String sharedLibName = staticLibName + '_' + PlatformDependent.normalizedArch();
    ClassLoader cl = PlatformDependent.getClassLoader(Native.class);
    try {
        NativeLibraryLoader.load(sharedLibName, cl);
    } catch (UnsatisfiedLinkError e1) {
        try {
            NativeLibraryLoader.load(staticLibName, cl);
            logger.debug("Failed to load {}", sharedLibName, e1);
        } catch (UnsatisfiedLinkError e2) {
            ThrowableUtil.addSuppressed(e1, e2);
            throw e1;
        }
    }
}
public static void load(String originalName, ClassLoader loader) {
    // Adjust expected name to support shading of native libraries.
    String packagePrefix = calculatePackagePrefix().replace('.', '_');
    String name = packagePrefix + originalName;
    List<Throwable> suppressed = new ArrayList<Throwable>();
    try {
        // first try to load from java.library.path
      	// 首先尝试从java.libraty.path中去加载
      	// 采用相对路径加载
        loadLibrary(loader, name, false);
        return;
    } catch (Throwable ex) {
        suppressed.add(ex);
    }
		// java.libraty.path加载失败,就去在META_INF/native/去查找 
    String libname = System.mapLibraryName(name);
    String path = NATIVE_RESOURCE_HOME + libname;

    InputStream in = null;
    OutputStream out = null;
    File tmpFile = null;
    URL url = getResource(path, loader);
    try {
      	// url == null,没有找到
        if (url == null) {
          	// 是否max系统
            if (PlatformDependent.isOsx()) {
                String fileName = path.endsWith(".jnilib") ? NATIVE_RESOURCE_HOME + "lib" + name + ".dynlib" :
                        NATIVE_RESOURCE_HOME + "lib" + name + ".jnilib";
                url = getResource(fileName, loader);
                if (url == null) {
                    FileNotFoundException fnf = new FileNotFoundException(fileName);
                    ThrowableUtil.addSuppressedAndClear(fnf, suppressed);
                    throw fnf;
                }
            } else {
              	// linux等系统下,不尝试再找了,直接报错
                FileNotFoundException fnf = new FileNotFoundException(path);
                ThrowableUtil.addSuppressedAndClear(fnf, suppressed);
                throw fnf;
            }
        }
				// 找到了的情况
        int index = libname.lastIndexOf('.');
        String prefix = libname.substring(0, index);
        String suffix = libname.substring(index);
				// 创建临时文件
        tmpFile = PlatformDependent.createTempFile(prefix, suffix, WORKDIR);
        in = url.openStream();
        out = new FileOutputStream(tmpFile);
				// 将META-INF/native/下的库复制成临时文件
        byte[] buffer = new byte[8192];
        int length;
        while ((length = in.read(buffer)) > 0) {
            out.write(buffer, 0, length);
        }
        out.flush();
				
        if (shouldShadedLibraryIdBePatched(packagePrefix)) {
            // Let's try to patch the id and re-sign it. This is a best-effort and might fail if a
            // SecurityManager is setup or the right executables are not installed :/
            tryPatchShadedLibraryIdAndSign(tmpFile, originalName);
        }

        // Close the output stream before loading the unpacked library,
        // because otherwise Windows will refuse to load it when it's in use by other process.
        closeQuietly(out);
        out = null;
				// 采用绝对路径进行加载
        loadLibrary(loader, tmpFile.getPath(), true);
    } catch (UnsatisfiedLinkError e) {
        try {
          	// 文件存在,并且是一个文件,并且可读,但是没有执行权限
            if (tmpFile != null && tmpFile.isFile() && tmpFile.canRead() &&
                !NoexecVolumeDetector.canExecuteExecutable(tmpFile)) {
                // Pass "io.netty.native.workdir" as an argument to allow shading tools to see
                // the string. Since this is printed out to users to tell them what to do next,
                // we want the value to be correct even when shading.
                logger.info("{} exists but cannot be executed even when execute permissions set; " +
                            "check volume for \"noexec\" flag; use -D{}=[path] " +
                            "to set native working directory separately.",
                            tmpFile.getPath(), "io.netty.native.workdir");
            }
        } catch (Throwable t) {
            suppressed.add(t);
            logger.debug("Error checking if {} is on a file store mounted with noexec", tmpFile, t);
        }
        // Re-throw to fail the load
        ThrowableUtil.addSuppressedAndClear(e, suppressed);
        throw e;
    } catch (Exception e) {
        UnsatisfiedLinkError ule = new UnsatisfiedLinkError("could not load a native library: " + name);
        ule.initCause(e);
        ThrowableUtil.addSuppressedAndClear(ule, suppressed);
        throw ule;
    } finally {
        closeQuietly(in);
        closeQuietly(out);
        // After we load the library it is safe to delete the file.
        // We delete the file immediately to free up resources as soon as possible,
        // and if this fails fallback to deleting on JVM exit.
      	// 最终删掉临时文件,tmpFile.delete(),如果删不掉,会尝试在JVM退出的时候删掉,取决于DELETE_NATIVE_LIB_AFTER_LOADING
      	// 可以通过 -Dio.netty.native.delteLibAfterLoading设置
        if (tmpFile != null && (!DELETE_NATIVE_LIB_AFTER_LOADING || !tmpFile.delete())) {
            tmpFile.deleteOnExit();
        }
    }
}
private static void loadLibrary(final ClassLoader loader, final String name, final boolean absolute) {
    Throwable suppressed = null;
    try {
        try {
            // Make sure the helper belongs to the target ClassLoader.
            final Class<?> newHelper = tryToLoadClass(loader, NativeLibraryUtil.class);
          	// 通过反射获取loadLibrary方法然后执行,也就是调用NativeLibraryUtil的loadLibraty方法
            loadLibraryByHelper(newHelper, name, absolute);
            logger.debug("Successfully loaded the library {}", name);
            return;
        } catch (UnsatisfiedLinkError e) { // Should by pass the UnsatisfiedLinkError here!
            suppressed = e;
        } catch (Exception e) {
            suppressed = e;
        }
        NativeLibraryUtil.loadLibrary(name, absolute);  // Fallback to local helper class.
        logger.debug("Successfully loaded the library {}", name);
    } catch (NoSuchMethodError nsme) {
        if (suppressed != null) {
            ThrowableUtil.addSuppressed(nsme, suppressed);
        }
        rethrowWithMoreDetailsIfPossible(name, nsme);
    } catch (UnsatisfiedLinkError ule) {
        if (suppressed != null) {
            ThrowableUtil.addSuppressed(ule, suppressed);
        }
        throw ule;
    }
}

总结:

Native库的加载逻辑:

  • java.ilbrary.path:/usr/java/packages/lib/amd64:/usr/lib64/:lib64:/lib:/usr/lib
  • META-INF/native/

Native相关参数

  • io.netty.transport.noNative
  • io.netty,native.workdir
  • io.netty.native.deleteLibAfterLoading

常见问题

1 准备的native库和实际平台不一致,比如准备的是32位的库,实际运行平台为64位的平台

2 临时文件没有执行权限

​ mount -o remount,exec /tmp