优化使用
整改线程模型
- 业务的常用两种场景
- CPU密集型:运算型
- IO密集型:等待型
CPU密集型
-
保持当前线程模型(复用NioEventLoop里面绑定的线程)
线程数计算:
- Runtime.getRuntime().avaliableProcessors() * 2 当前机器CPU数*2
- io.netty.avaliableProcessors * 2 考虑到docker的情况,使用自己指定的CPU数 *2
- io,netty.eventLoopThreads 部分调优场景直接设置
IO密集型
-
整改线程模型:独立出”线程池“来处理业务,不再和NioEventLoop共享
IO密集型,业务处理经常要等待很长时间,如果不独立出来,就会抢占IO事件的处理,如果IO事件处理不及时的话,效率就会大打折扣
-
在handler内部使用JDK Executors
-
添加handler时,指定一个:
EventExecutorGroup eventExecutorGroup = new UnorderedThreadEventExecutor(10);
pipeline.addLast(eventExecutorGroup,serverHandler);
-
代码示例
修改OrderOperation的execute方法,模拟其耗时比较久
@Override
public OperationResult execute() {
log.info("order's executing startup with orderRequest: {}" , this.toString());
// sleep 3秒钟
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
log.info("order's executing complete");
OrderOperationResult result = new OrderOperationResult(tableId,dish,true);
return result;
}
修改server代码,在childHandler方法外创建UnorderedThreadPoolEventExecutor,并命名为business,将其配置到pipeline中
MetricsHandler metricsHandler = new MetricsHandler();
// 配置在childHandler方法外,让childHandler中OrderServerProcessHandler共享该线程池
UnorderedThreadPoolEventExecutor business = new UnorderedThreadPoolEventExecutor(10,
new DefaultThreadFactory("business"));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("frameDecode", new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast("metricsHandler", metricsHandler);
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
// 添加到pipeline中
pipeline.addLast(business, new OrderServerProcessHandler());
}
});
这样一来,所有OrderServerProcessHandler处理时都需要耗时3秒钟,并且处理时都是用名为business的线程池。
接着,修改Client类,模拟多次请求
for (int i = 0; i < 20; i++) {
channelFuture.channel().writeAndFlush(requestMessage);
}
启动Server,然后启动Client,查看Server的日志
为什么不用NioEventLoopGroup
将Server代码中的UnorderedThreadPoolEventExecutor换成NioEventLoopGroup
NioEventLoopGroup eventExecutors = new NioEventLoopGroup(0, new DefaultThreadFactory("business"));
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("frameDecode", new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast("metricsHandler", metricsHandler);
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
pipeline.addLast(eventExecutors, new OrderServerProcessHandler());
}
});
再次启动Server端及Client端,查看Server端的日志
可以看到,它只是用了一个线程,处理很慢。
查看pipeline.addLast()方法中的newContext()方法
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
// SINGLE_EVENTEXECUTOR_PER_GROUP:当增加一个handler并且指定EventExecutorGroup时:决定这个handler是否只用EventExecutorGroup中的一个固定的EventExecutor(取决于next()实现),默认为true
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
// hashMap的key是group,值是next实现
EventExecutor childExecutor = childExecutors.get(group);
if (childExecutor == null) {
// SINGLE_EVENTEXECUTOR_PER_GROUP 为true,就会共享唯一一个next返回的值
// 对于NioEventLoopGroup而言,就是返回其中一个子元素
// UnorderedThreadPoolEventExecutor的方法返回的是this,而不是ThreadPool中的某一个线程
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
增强写、延迟与吞吐量的抉择
写的问题
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
Operation operation = requestMessage.getMessageBody();
OperationResult operationResult = operation.execute();
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
responseMessage.setMessageBody(operationResult);
ctx.writeAndFlush(responseMessage);
}
}
使用ctx.writeAndFlush(),全部都是”加急式“快递。
不见得所以程序都需要如此,这种方式每次写数据都要flush,吞吐量不见得会很高。
改进方式1:channelReadComplete
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
在官方的例子中,将read分为了两步,在channelRead的时候执行ctx.write,在channelReadComplete的时候执行ctx.flush。
一个读事件的触发可能触发多次读,最多可能是16次,比如需要读3次,按照之前的写法,就会flush3次,而拆开后就只会flush1次。
缺点
-
不适合异步业务线程(不复用NIO event lopp)处理,比如前面说到的IO密集型,独立出了一个线程池
channelRead中的业务处理结果的write很可能发生在channelReadComplete之后
-
不适合更精细的控制:例如连续读16次,第16次是flush,但是如果保持连续的次数不变还是16次,但是想读3次就flush,这样就做不到了
改进方式2:FlushConsolidationHandler
源码分析
io.netty.handler.flush.FlushConsolidationHandler#flush
//
// 同步: read -> writeAndFlush -> readComplete
// 异步: read -> readComplete -> writeAndFlush
// readInProgress 000011111111111111111111110000000000000
// 000001111000000000000000000000000000000
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
// 根据业务线程是否复用IO线程两种情况来考虑
// 复用情况
if (readInProgress) { // 正在读的时候
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
// 每explicitFlushAfterFlushes个”批量“写(flush)一次
// 不足怎么办?channelReadComplete会flush掉后面的
if (++flushPendingCount == explicitFlushAfterFlushes) {
flushNow(ctx);
}
// 以下是非复用情况:异步情况
} else if (consolidateWhenNoReadInProgress) {
// (业务异步化情况)开启consolidateWhenNoReadInProgress时,优化flush
// (比如没有读请求了,但是内部还是忙的团团转,没有消化的时候,所以还是会写响应)
// Flush immediately if we reach the threshold, otherwise schedule
if (++flushPendingCount == explicitFlushAfterFlushes) {
flushNow(ctx);
} else {
scheduleFlush(ctx);
}
} else {
//(业务异步化情况下)没有开启consolidateWhenNoReadInProgress时,直接flush
// Always flush directly
flushNow(ctx);
}
}
对于复用的情况,一开始会走到readInProgress的逻辑中,这就是正在读,有可能会读到3次数据或者10次数据,比如10次数据中,我们希望每5次执行一次flush,所以定义了一个参数explicitFlushAfterFlushes,如果定义了5次,但是读了6次,同步情况下,最终会调用readComplete,也就是最后会调用io.netty.handler.flush.FlushConsolidationHandler#channelReadComplete方法,将最后一次的数据flush出去。
如果readInProgress为false的时候,也就是不在读的时候,这种情况往往是异步的情况,异步情况默认是没有增强的,就直接flush了。
如果在异步的时候也想减少flush的次数,netty定义了consolidateWhenNoReadInProgress参数,当它为true时,就可以增强异步的情况。如果次数不到explicitFlushAfterFlushes,就执行scheduleFlush。
// 尽快,但是给个优化的机会
private void scheduleFlush(final ChannelHandlerContext ctx) {
if (nextScheduledFlush == null) {
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
nextScheduledFlush = ctx.channel().eventLoop().submit(flushTask);
}
}
schedule就意味着这是一个task,有可能会延迟执行,延迟的过程中,又有了减少flush的机会
使用
直接在Server的pipeline中添加FlushConsolidationHandler
pipeline.addLast("flushEnhance", new FlushConsolidationHandler(5,true));
// explicitFlushAfterFlushes:多少次flush一次
// consolidateWhenNoReadInProgress:是否开启异步增强
public FlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress) {
...
}
public class FlushConsolidationHandler extends ChannelDuplexHandler {
...
}
FlushConsolidationHandler类上没有添加@Sharable注解,所以不能共享
流量整形
流量整形的用途
- 网盘限速(有意)
- 景点限流(无奈)
Netty内置的三种流量整形
Global级别:GlobalTrafficShapingHandler
Channel级别:ChannelTrafficShapingHandler
Global和channel都有两个参数,分别控制读和写的限流
Global+Channel:GlobalChannelTrafficShapingHandler
Gobal+Channel一共有4个参数
Netty流量整形的源码分析及总结
GlobalTrafficShapingHandler
读流控
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
// 如果TS的handler放错了位置,接收的不是byte buffer之类,则直接跳过
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
// 当数据不是byte buffer时,size计算出是-1,所以不会走到流量整形里面,所以handler的位置很重要
if (size > 0) {
// compute the number of ms to wait before reopening the channel
// 计算需要等待多久,readLimit:限流大小;maxTime:最大等待时间,now:当前时间
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
// 检查等待时间,如果算出的最大时间大于设置的最大时间,就返回设置的最大时间
wait = checkWaitReadTime(ctx, wait, now);
// 判断最小等待时间,如果等待时间太小,就意义不大了
// 如果等待时间大于最小等待时间,就继续处理
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
// 设置autoRead为false,并且将读事件从Selector上移除,相当于把读暂停掉了
config.setAutoRead(false);
channel.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = channel.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
// ReopenReadTimerTask 重新打开读的task
// 核心是将autoRead设置为true,然后channelRead
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
// 过wait时间后,重新打开读功能
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
}
}
informReadOperation(ctx, now);
ctx.fireChannelRead(msg);
}
protected long calculateSize(Object msg) {
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
if (msg instanceof FileRegion) {
return ((FileRegion) msg).count();
}
return -1;
}
@Override
long checkWaitReadTime(final ChannelHandlerContext ctx, long wait, final long now) {
Integer key = ctx.channel().hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel != null) {
if (wait > maxTime && now + wait - perChannel.lastReadTimestamp > maxTime) {
wait = maxTime;
}
}
return wait;
}
// io.netty.handler.traffic.AbstractTrafficShapingHandler.ReopenReadTimerTask#run
@Override
public void run() {
Channel channel = ctx.channel();
ChannelConfig config = channel.config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
channel.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
}
channel.attr(READ_SUSPENDED).set(false);
// 将autoRead设置为true,然后channelRead
config.setAutoRead(true);
channel.read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
写流控
@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
// 计算size
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
// size>0,需要写暂停
if (size > 0) {
// compute the number of ms to wait before continue with the channel
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
// 注意这里的delay是0,表示不等待
submitWrite(ctx, msg, size, 0, now, promise);
}
@Override
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long writedelay, final long now,
final ChannelPromise promise) {
// 步骤1:根据channel的key,获取对应的存delay的queueu,没有则创建
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
// in case write occurs before handlerAdded is raised for this handler
// imply a synchronized only if needed
// 里面有一个ArrayDeque,用于存放写暂停时候的写数据
perChannel = getOrSetPerChannel(ctx);
}
final ToSend newToSend;
long delay = writedelay;
boolean globalSizeExceeded = false;
// write operations need synchronization
synchronized (perChannel) {
// 步骤2:判断是否要delay,如果不需要且queue中无数据,则直接发送
// 如果queue有数据,即使不需要delay,也要将数据入queue,因为要保持顺序
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
perChannel.lastWriteTimestamp = now;
return;
}
// 步骤3: 预计delay时间过长,则最多等待15秒(maxTime的值)
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
// 步骤4:数据进入queue
newToSend = new ToSend(delay + now, msg, size, promise);
// 不管什么情况,都直接入queue,所以可能会OOM,所有后面要根据queue的情况,改变可写标记位
perChannel.messagesQueue.addLast(newToSend);
perChannel.queueSize += size;
// 上下2个queueSize不一样,上面少个s,代表channel上的queue,下面是global的
queuesSize.addAndGet(size);
// 步骤5:判断是否queue的数据过多,如果是,设置写状态为不可写
// 判断channel的queue size是否超标,或者需要停的时间过长,设置writable为false,提醒让上面的handler不要写了
checkWriteSuspend(ctx, delay, perChannel.queueSize);
// 判断global的queues(所有的queue加一起的结果)size超标
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
}
// 如果超标了,就会给userDefinedWritabilityIndex赋值,也就是将写变成不可写状态
if (globalSizeExceeded) {
setUserDefinedWritability(ctx, false);
}
// 步骤6:开始schedule一个task来等待delay的时间再发送
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, forSchedule, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
// 判断写暂停时的数据size是否大于最大的写size,或者等待时间超过了最大的写等待时间
// 可能会OOM了,或者需要等待的时间太长了,就不建议写了
// 类似于景点,发现排队时间过长,或者人满为患了,这个时间就出公告,不建议大家再捡来了
if (queueSize > maxWriteSize || delay > maxWriteDelay) {
setUserDefinedWritability(ctx, false);
}
}
void setUserDefinedWritability(ChannelHandlerContext ctx, boolean writable) {
ChannelOutboundBuffer cob = ctx.channel().unsafe().outboundBuffer();
if (cob != null) {
cob.setUserDefinedWritability(userDefinedWritabilityIndex, writable);
}
}
TrafficCounter
void createGlobalTrafficCounter(ScheduledExecutorService executor) {
TrafficCounter tc = new TrafficCounter(this,
ObjectUtil.checkNotNull(executor, "executor"),
"GlobalTC",
checkInterval);
setTrafficCounter(tc);
// 创建之后直接start
tc.start();
}
public synchronized void start() {
if (monitorActive) {
return;
}
lastTime.set(milliSecondFromNano());
long localCheckInterval = checkInterval.get();
// if executor is null, it means it is piloted by a GlobalChannelTrafficCounter, so no executor
if (localCheckInterval > 0 && executor != null) {
monitorActive = true;
monitor = new TrafficMonitoringTask();
// start的时候,执行的是monitor,也就是TrafficMonitoringTask
// 执行周期为1秒,也就是流控是按照周期进行的,也就是窗口的模式
scheduledFuture =
executor.scheduleAtFixedRate(monitor, 0, localCheckInterval, TimeUnit.MILLISECONDS);
}
}
private final class TrafficMonitoringTask implements Runnable {
@Override
public void run() {
if (!monitorActive) {
return;
}
// 将account给Reset掉
resetAccounting(milliSecondFromNano());
if (trafficShapingHandler != null) {
trafficShapingHandler.doAccounting(TrafficCounter.this);
}
}
}
synchronized void resetAccounting(long newLastTime) {
long interval = newLastTime - lastTime.getAndSet(newLastTime);
if (interval == 0) {
// nothing to do
return;
}
if (logger.isDebugEnabled() && interval > checkInterval() << 1) {
logger.debug("Acct schedule not ok: " + interval + " > 2*" + checkInterval() + " from " + name);
}
lastReadBytes = currentReadBytes.getAndSet(0);
lastWrittenBytes = currentWrittenBytes.getAndSet(0);
lastReadThroughput = lastReadBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
lastWriteThroughput = lastWrittenBytes * 1000 / interval;
// nb byte / checkInterval in ms * 1000 (1s)
realWriteThroughput = realWrittenBytes.getAndSet(0) * 1000 / interval;
lastWritingTime = Math.max(lastWritingTime, writingTime);
lastReadingTime = Math.max(lastReadingTime, readingTime);
}
总结:
- 读写流控判断:按一定时间段checkInterval(1s)来统计。writeLimit/readLimit设置的值为0时,表示关闭写整形/读整形
- 等待时间范围控制10ms(MINIMAL_WAIT)-> 15s(maxTime)
- 读流控:取消读事件监听,当如缓存区满,然后对端写缓存区满,然后对端写不进去,对端对数据进行丢弃或减缓发送
- 写流控:待发数据进去Queue。等待时间超过4s(maxWriteDelay)|| 单个channel缓存的数据超过4M(maxWriteSize)||所有缓存数据超过400M(maxGlobalWriteSize)时修改写状态为不可写
示例:流量整形的使用
- ChannelTrafficShapingHandler
- GlobalTrafficShapingHandler:share
- GlobalChannelTrafficShapingHandler:share
// executor:用于周期性计算流控的统计数据
// checkInterval:周期时间
public GlobalChannelTrafficShapingHandler(ScheduledExecutorService executor,
long writeGlobalLimit, long readGlobalLimit,
long writeChannelLimit, long readChannelLimit,
long checkInterval, long maxTime) {
...
...
}
GlobalTrafficShapingHandler globalTrafficShapingHandler =
new GlobalTrafficShapingHandler(new NioEventLoopGroup(), 100 * 1024 * 1024, 100 * 1024 * 1024);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("TShandler", globalTrafficShapingHandler);
...
});
为不同平台开启native
如何开启Native
修改代码
- NioServerSockerChannel -> [Prefix]ServerSocketChannel
- NioEventLoopGroup -> [Prefix]EventLoopGroup
准备好native库:自己build/Netty jar中也自带了一些
例如将Server代码中的Nio字段全部替换为Epoll,然后重新导入包
此时,启动Server会抛出异常,加载native库失败,Epoll只能在Linux平台运行
源码分析Native库加载逻辑
io.netty.channel.epoll.Epoll
static {
Throwable cause = null;
// 判断io.netty.transport.noNative来决定是否启用native
// 如果不启用,就直接抛出Exception
if (SystemPropertyUtil.getBoolean("io.netty.transport.noNative", false)) {
cause = new UnsupportedOperationException(
"Native transport was explicit disabled with -Dio.netty.transport.noNative=true");
} else {
FileDescriptor epollFd = null;
FileDescriptor eventFd = null;
try {
// 触发native库的加载
epollFd = Native.newEpollCreate();
eventFd = Native.newEventFd();
} catch (Throwable t) {
cause = t;
} finally {
if (epollFd != null) {
try {
epollFd.close();
} catch (Exception ignore) {
// ignore
}
}
if (eventFd != null) {
try {
eventFd.close();
} catch (Exception ignore) {
// ignore
}
}
}
}
UNAVAILABILITY_CAUSE = cause;
}
io.netty.channel.epoll.Native
static {
Selector selector = null;
try {
// We call Selector.open() as this will under the hood cause IOUtil to be loaded.
// This is a workaround for a possible classloader deadlock that could happen otherwise:
//
// See https://github.com/netty/netty/issues/10187
selector = Selector.open();
} catch (IOException ignore) {
// Just ignore
}
// Preload all classes that will be used in the OnLoad(...) function of JNI to eliminate the possiblity of a
// class-loader deadlock. This is a workaround for https://github.com/netty/netty/issues/11209.
// This needs to match all the classes that are loaded via NETTY_JNI_UTIL_LOAD_CLASS or looked up via
// NETTY_JNI_UTIL_FIND_CLASS.
ClassInitializerUtil.tryLoadClasses(Native.class,
// netty_epoll_linuxsocket
PeerCredentials.class, DefaultFileRegion.class, FileChannel.class, java.io.FileDescriptor.class,
// netty_epoll_native
NativeDatagramPacketArray.NativeDatagramPacket.class
);
try {
// First, try calling a side-effect free JNI method to see if the library was already
// loaded by the application.
// 调用一个轻量级操作测试下是否加载过了,如果没有报错,就说明加载过了,就不需要重复加载了
offsetofEpollData();
} catch (UnsatisfiedLinkError ignore) {
// The library was not previously loaded, load it now.
// 加载native库
loadNativeLibrary();
} finally {
try {
if (selector != null) {
selector.close();
}
} catch (IOException ignore) {
// Just ignore
}
}
Unix.registerInternal(new Runnable() {
@Override
public void run() {
registerUnix();
}
});
}
private static void loadNativeLibrary() {
String name = PlatformDependent.normalizedOs();
// 判断操作系统的名字
if (!"linux".equals(name)) {
throw new IllegalStateException("Only supported on Linux");
}
// 拼接两种lib的名字,分别尝试加载
String staticLibName = "netty_transport_native_epoll";
String sharedLibName = staticLibName + '_' + PlatformDependent.normalizedArch();
ClassLoader cl = PlatformDependent.getClassLoader(Native.class);
try {
NativeLibraryLoader.load(sharedLibName, cl);
} catch (UnsatisfiedLinkError e1) {
try {
NativeLibraryLoader.load(staticLibName, cl);
logger.debug("Failed to load {}", sharedLibName, e1);
} catch (UnsatisfiedLinkError e2) {
ThrowableUtil.addSuppressed(e1, e2);
throw e1;
}
}
}
public static void load(String originalName, ClassLoader loader) {
// Adjust expected name to support shading of native libraries.
String packagePrefix = calculatePackagePrefix().replace('.', '_');
String name = packagePrefix + originalName;
List<Throwable> suppressed = new ArrayList<Throwable>();
try {
// first try to load from java.library.path
// 首先尝试从java.libraty.path中去加载
// 采用相对路径加载
loadLibrary(loader, name, false);
return;
} catch (Throwable ex) {
suppressed.add(ex);
}
// java.libraty.path加载失败,就去在META_INF/native/去查找
String libname = System.mapLibraryName(name);
String path = NATIVE_RESOURCE_HOME + libname;
InputStream in = null;
OutputStream out = null;
File tmpFile = null;
URL url = getResource(path, loader);
try {
// url == null,没有找到
if (url == null) {
// 是否max系统
if (PlatformDependent.isOsx()) {
String fileName = path.endsWith(".jnilib") ? NATIVE_RESOURCE_HOME + "lib" + name + ".dynlib" :
NATIVE_RESOURCE_HOME + "lib" + name + ".jnilib";
url = getResource(fileName, loader);
if (url == null) {
FileNotFoundException fnf = new FileNotFoundException(fileName);
ThrowableUtil.addSuppressedAndClear(fnf, suppressed);
throw fnf;
}
} else {
// linux等系统下,不尝试再找了,直接报错
FileNotFoundException fnf = new FileNotFoundException(path);
ThrowableUtil.addSuppressedAndClear(fnf, suppressed);
throw fnf;
}
}
// 找到了的情况
int index = libname.lastIndexOf('.');
String prefix = libname.substring(0, index);
String suffix = libname.substring(index);
// 创建临时文件
tmpFile = PlatformDependent.createTempFile(prefix, suffix, WORKDIR);
in = url.openStream();
out = new FileOutputStream(tmpFile);
// 将META-INF/native/下的库复制成临时文件
byte[] buffer = new byte[8192];
int length;
while ((length = in.read(buffer)) > 0) {
out.write(buffer, 0, length);
}
out.flush();
if (shouldShadedLibraryIdBePatched(packagePrefix)) {
// Let's try to patch the id and re-sign it. This is a best-effort and might fail if a
// SecurityManager is setup or the right executables are not installed :/
tryPatchShadedLibraryIdAndSign(tmpFile, originalName);
}
// Close the output stream before loading the unpacked library,
// because otherwise Windows will refuse to load it when it's in use by other process.
closeQuietly(out);
out = null;
// 采用绝对路径进行加载
loadLibrary(loader, tmpFile.getPath(), true);
} catch (UnsatisfiedLinkError e) {
try {
// 文件存在,并且是一个文件,并且可读,但是没有执行权限
if (tmpFile != null && tmpFile.isFile() && tmpFile.canRead() &&
!NoexecVolumeDetector.canExecuteExecutable(tmpFile)) {
// Pass "io.netty.native.workdir" as an argument to allow shading tools to see
// the string. Since this is printed out to users to tell them what to do next,
// we want the value to be correct even when shading.
logger.info("{} exists but cannot be executed even when execute permissions set; " +
"check volume for \"noexec\" flag; use -D{}=[path] " +
"to set native working directory separately.",
tmpFile.getPath(), "io.netty.native.workdir");
}
} catch (Throwable t) {
suppressed.add(t);
logger.debug("Error checking if {} is on a file store mounted with noexec", tmpFile, t);
}
// Re-throw to fail the load
ThrowableUtil.addSuppressedAndClear(e, suppressed);
throw e;
} catch (Exception e) {
UnsatisfiedLinkError ule = new UnsatisfiedLinkError("could not load a native library: " + name);
ule.initCause(e);
ThrowableUtil.addSuppressedAndClear(ule, suppressed);
throw ule;
} finally {
closeQuietly(in);
closeQuietly(out);
// After we load the library it is safe to delete the file.
// We delete the file immediately to free up resources as soon as possible,
// and if this fails fallback to deleting on JVM exit.
// 最终删掉临时文件,tmpFile.delete(),如果删不掉,会尝试在JVM退出的时候删掉,取决于DELETE_NATIVE_LIB_AFTER_LOADING
// 可以通过 -Dio.netty.native.delteLibAfterLoading设置
if (tmpFile != null && (!DELETE_NATIVE_LIB_AFTER_LOADING || !tmpFile.delete())) {
tmpFile.deleteOnExit();
}
}
}
private static void loadLibrary(final ClassLoader loader, final String name, final boolean absolute) {
Throwable suppressed = null;
try {
try {
// Make sure the helper belongs to the target ClassLoader.
final Class<?> newHelper = tryToLoadClass(loader, NativeLibraryUtil.class);
// 通过反射获取loadLibrary方法然后执行,也就是调用NativeLibraryUtil的loadLibraty方法
loadLibraryByHelper(newHelper, name, absolute);
logger.debug("Successfully loaded the library {}", name);
return;
} catch (UnsatisfiedLinkError e) { // Should by pass the UnsatisfiedLinkError here!
suppressed = e;
} catch (Exception e) {
suppressed = e;
}
NativeLibraryUtil.loadLibrary(name, absolute); // Fallback to local helper class.
logger.debug("Successfully loaded the library {}", name);
} catch (NoSuchMethodError nsme) {
if (suppressed != null) {
ThrowableUtil.addSuppressed(nsme, suppressed);
}
rethrowWithMoreDetailsIfPossible(name, nsme);
} catch (UnsatisfiedLinkError ule) {
if (suppressed != null) {
ThrowableUtil.addSuppressed(ule, suppressed);
}
throw ule;
}
}
总结:
Native库的加载逻辑:
- java.ilbrary.path:/usr/java/packages/lib/amd64:/usr/lib64/:lib64:/lib:/usr/lib
- META-INF/native/
Native相关参数
- io.netty.transport.noNative
- io.netty,native.workdir
- io.netty.native.deleteLibAfterLoading
常见问题
1 准备的native库和实际平台不一致,比如准备的是32位的库,实际运行平台为64位的平台
2 临时文件没有执行权限
mount -o remount,exec /tmp