Netty(8)开源软件中的使用

young 741 2022-05-22

开源软件中的Netty使用

Cassandra

Cassandra是什么

https://github.com/apache/cassandra

  • Apache Cassandra是一个开源的、分布式、去中心化、弹性可扩展、高可用性、容错、一致性可调、面向列的数据库

  • 诞生于2007年,于2008年开源

  • 它最初由FaceBook创建,用于存储收件箱等简单格式数据。此后,由于扩展性良好,被Digg、Twitter等知名网站所采纳,称为了一种流行的分布式结构化数据库存储方案。

Cassandra数据传输

0         8        16        24        32        40
+---------+---------+---------+---------+---------+
| version |   flag  |       stream      | opcode  |
+---------+---------+---------+---------+---------+
|                 length                |  data
+---------+---------+---------+---------+

flag:org.apache.cassandra.transport.Envelope.Header.Flag

opcode:org.apache.cassandra.transport.Message.Type

data获取:org.apache.cassandra.transport.CBUtil#readLongString

int length = cb.readInt();
return readString(cb, length);

Cassandra使用Netty概况

总览

org.apache.cassandra.transport.PipelineConfigurator#initializeChannel

public ChannelFuture initializeChannel(final EventLoopGroup workerGroup,
                                       final InetSocketAddress socket,
                                       final Connection.Factory connectionFactory)
{
    ServerBootstrap bootstrap = new ServerBootstrap()
      													// 	支持两种Channel
                                .channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                                .childOption(ChannelOption.TCP_NODELAY, true)
                                .childOption(ChannelOption.SO_LINGER, 0)
                                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive)
                                .childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
                                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
    if (workerGroup != null)
      	// 没有使用主从模式
        bootstrap = bootstrap.group(workerGroup);
		// pipeline
    ChannelInitializer<Channel> initializer = initializer(connectionFactory);
    bootstrap.childHandler(initializer);

    // Bind and start to accept incoming connections.
    logger.info("Using Netty Version: {}", Version.identify().entrySet());
    logger.info("Starting listening for CQL clients on {} ({})...", socket, tlsEncryptionPolicy.description());
    return bootstrap.bind(socket);
}

Pipeline

public void configureModernPipeline(ChannelHandlerContext ctx,
                                    ClientResourceLimits.Allocator resourceAllocator,
                                    ProtocolVersion version,
                                    Map<String, String> options)
{
    BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
    ctx.channel().config().setOption(ChannelOption.ALLOCATOR, allocator);

    // Transport level encoders/decoders
    String compression = options.get(StartupMessage.COMPRESSION);
    FrameDecoder frameDecoder = frameDecoder(compression, allocator);
    FrameEncoder frameEncoder = frameEncoder(compression);
    FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();
    ChannelInboundHandlerAdapter exceptionHandler = ExceptionHandlers.postV5Handler(payloadAllocator, version);

    // CQL level encoders/decoders
    Message.Decoder<Message.Request> messageDecoder = messageDecoder();
    Envelope.Decoder envelopeDecoder = new Envelope.Decoder();

    // Any non-fatal errors caught in CQLMessageHandler propagate back to the client
    // via the pipeline. Firing the exceptionCaught event on an inbound handler context
    // (in this case, the initial context) will cause it to propagate to to the
    // exceptionHandler provided none of the the intermediate handlers drop it
    // in their exceptionCaught implementation
    ChannelPipeline pipeline = ctx.channel().pipeline();
    final ChannelHandlerContext firstContext = pipeline.firstContext();
    CQLMessageHandler.ErrorHandler errorHandler = firstContext::fireExceptionCaught;

    // Capacity tracking and resource management
    int queueCapacity = DatabaseDescriptor.getNativeTransportReceiveQueueCapacityInBytes();
    ClientResourceLimits.ResourceProvider resourceProvider = resourceProvider(resourceAllocator);
    AbstractMessageHandler.OnHandlerClosed onClosed = handler -> resourceProvider.release();
    boolean throwOnOverload = "1".equals(options.get(StartupMessage.THROW_ON_OVERLOAD));

    CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer = messageConsumer();
    CQLMessageHandler<Message.Request> processor =
        new CQLMessageHandler<>(ctx.channel(),
                                version,
                                frameDecoder,
                                envelopeDecoder,
                                messageDecoder,
                                messageConsumer,
                                payloadAllocator,
                                queueCapacity,
                                resourceProvider,
                                onClosed,
                                errorHandler,
                                throwOnOverload);

    pipeline.remove(ENVELOPE_ENCODER);    // remove old outbound cql envelope encoder
    pipeline.addBefore(INITIAL_HANDLER, FRAME_DECODER, frameDecoder);
    pipeline.addBefore(INITIAL_HANDLER, FRAME_ENCODER, frameEncoder);
    pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
    pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler);
    pipeline.remove(INITIAL_HANDLER);

    // Handles delivering event messages to registered clients
    ctx.channel()
       .attr(Dispatcher.EVENT_DISPATCHER)
       .set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
    onNegotiationComplete(pipeline);
}

Cassandra使用Netty的一些技巧

Cassandra版本:4.0.4

1 显示Netty版本信息,便于排查问题

Version.identify().entrySet();

2 配置使用启用Epoll

if (userEpoll)
  workerGroup = new EpollEventLoopGroup();
else
  workerGroup = new NioEventLoopGroup();
public static boolean useEpoll()
{
    final boolean enableEpoll = Boolean.parseBoolean(System.getProperty("cassandra.native.epoll.enabled", "true"));

    if (enableEpoll && !Epoll.isAvailable() && NativeLibrary.osType == NativeLibrary.OSType.LINUX)
        logger.warn("epoll not available", Epoll.unavailabilityCause());
		// 判断Epoll是否可使用
    return enableEpoll && Epoll.isAvailable();
}

3 自动判断要不要启动SSL:类似io.netty.handler.ssl.OptionalSslHanlder

protected EncryptionConfig encryptionConfig()
{
    final EncryptionOptions encryptionOptions = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
    switch (tlsEncryptionPolicy)
    {
        case UNENCRYPTED:
            // if encryption is not enabled, no further steps are required after the initial setup
            return channel -> {};
        case OPTIONAL:
            // If optional, install a handler which detects whether or not the client is sending
            // encrypted bytes. If so, on receipt of the next bytes, replace that handler with
            // an SSL Handler, otherwise just remove it and proceed with an unencrypted channel.
            logger.debug("Enabling optionally encrypted CQL connections between client and server");
            return channel -> {
                SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
                                                                         encryptionOptions.require_client_auth,
                                                                         SSLFactory.SocketType.SERVER);

                channel.pipeline().addFirst(SSL_HANDLER, new ByteToMessageDecoder()
                {
                    @Override
                    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
                    {		// 根据第一个消息的5个字节判断消息是不是支持SSL
                        if (byteBuf.readableBytes() < 5)
                        {
                            // To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
                            // once more bytes a ready.
                            return;
                        }
                        if (SslHandler.isEncrypted(byteBuf))
                        {
                            // Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use
                            // encryption.
                            SslHandler sslHandler = sslContext.newHandler(channel.alloc());
                            channelHandlerContext.pipeline().replace(SSL_HANDLER, SSL_HANDLER, sslHandler);
                        }
                        else
                        {
                            // Connection use no TLS/SSL encryption, just remove the detection handler and continue without
                            // SslHandler in the pipeline.
                            channelHandlerContext.pipeline().remove(SSL_HANDLER);
                        }
                    }
                });
            };
        case ENCRYPTED:
            logger.debug("Enabling encrypted CQL connections between client and server");
            return channel -> {
                SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
                                                                         encryptionOptions.require_client_auth,
                                                                         SSLFactory.SocketType.SERVER);
                channel.pipeline().addFirst(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
            };
        default:
            throw new IllegalStateException("Unrecognized TLS encryption policy: " + this.tlsEncryptionPolicy);
    }
}

4 控制连接总数和单个IP连接数,区别于io.netty.handler.ipfilter.UniqueIpFilter

UniqueIpFilter是一个IP一个连接

Cassandra实现了两个控制,一个是一个IP能建多少个连接(DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp()),另一个是整个系统的最大连接数(DatabaseDescriptor.getNativeTransportMaxConcurrentConnections())

// org.apache.cassandra.transport.ConnectionLimitHandler#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
    final long count = counter.incrementAndGet();
    long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
    // Setting the limit to -1 disables it.
    if(limit < 0)
    {
        limit = Long.MAX_VALUE;
    }
    if (count > limit)
    {
        // The decrement will be done in channelClosed(...)
        noSpamLogger.error("Exceeded maximum native connection limit of {} by using {} connections (see native_transport_max_concurrent_connections in cassandra.yaml)", limit, count);
        ctx.close();
    }
    else
    {
        long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
        if (perIpLimit > 0)
        {
            InetAddress address = setRemoteAddressAttribute(ctx.channel());
            if (address == null)
            {
                ctx.close();
                return;
            }
            AtomicLong perIpCount = connectionsPerClient.get(address);
            if (perIpCount == null)
            {
                perIpCount = new AtomicLong(0);

                AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
                if (old != null)
                {
                    perIpCount = old;
                }
            }
            if (perIpCount.incrementAndGet() > perIpLimit)
            {
                // The decrement will be done in channelClosed(...)
                noSpamLogger.error("Exceeded maximum native connection limit per ip of {} by using {} connections (see native_transport_max_concurrent_connections_per_ip)", perIpLimit, perIpCount);
                ctx.close();
                return;
            }
        }
        ctx.fireChannelActive();
    }
}

5 减少flush次数(时间)org.apache.cassandra.transport.Flusher.LegacyFlusher#run

// 在次数控制的基础上,增加了时间的控制
// 当次数不满足要求,但是停顿超过10微秒的时候,也会flush掉
public void run()
{
    boolean doneWork = processQueue();
    runsSinceFlush++;
		
    if (!doneWork || runsSinceFlush > 2 || processed.size() > 50)
    {
        flushWrittenChannels();
        runsSinceFlush = 0;
    }

    if (doneWork)
    {
        runsWithNoWork = 0;
    }
    else
    {
        // either reschedule or cancel
        if (++runsWithNoWork > 5)
        {
            scheduled.set(false);
            if (isEmpty() || !scheduled.compareAndSet(false, true))
                return;
        }
    }

    eventLoop.schedule(this, 10000, TimeUnit.NANOSECONDS);
}

Dubbo

Dubbo是什么

https://dubbo.apache.org/zh/index.html

2008年诞生,一款高性能、轻量级的开源Java RPC框架,源于阿里巴巴,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现

Dubbo传输数据结构

dubbo数据传输结构

两个字节作为Magic魔术位;一位(req/res)标识是请求还是响应;1way:又去无回,不关心响应;2way:关心响应;Event:比如心跳包就是一种event;序列化类型ID,标识编解码的规则;Status:状态

RPC Request ID:和之前的steamId类似;

Data Length:说明使用的是长度封帧

Dubbo使用Netty概况

版本3.0.8

总览

org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen

/**
 * Init and start netty server
 */
protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();
				// 主从模式
        bossGroup = createBossGroup();
        workerGroup = createWorkerGroup();
				
        final NettyServerHandler nettyServerHandler = createNettyServerHandler();
        channels = nettyServerHandler.getChannels();

        initServerBootstrap(nettyServerHandler);

        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }
protected EventLoopGroup createBossGroup() {
    return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}

protected EventLoopGroup createWorkerGroup() {
    return NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        EVENT_LOOP_WORKER_POOL_NAME);
}
public static EventLoopGroup eventLoopGroup(int threads, String threadFactoryName) {
  	// Nio和Epoll选择
    ThreadFactory threadFactory = new DefaultThreadFactory(threadFactoryName, true);
    return shouldEpoll() ? new EpollEventLoopGroup(threads, threadFactory) :
            new NioEventLoopGroup(threads, threadFactory);
}

Pipeline

org.apache.dubbo.remoting.transport.netty4.NettyServer#initServerBootstrap

protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
    boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);

    bootstrap.group(bossGroup, workerGroup)
            .channel(NettyEventLoopFactory.serverSocketChannelClass())
      			// 参数设置
            .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_KEEPALIVE, keepalive)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    // FIXME: should we use getTimeout()?
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                  	// SSL支持
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
                    }
                    ch.pipeline()
                      			// 一对编解码和一个handler
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                      			// 读写都空闲时触发idle
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
}

线程模型

可配置,有两个参数dispatch和threadpool,dispatch决定什么消息用线程池,threapool决定用什么线程池

执行方式

dubbo-remoting/dubbo-remoting-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Dispatcher

all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher

线程池:

dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.ThreadPool

fixed=org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool

Dubbo使用Netty的一些技巧

配置驱动

直接取值

BossGroup指定为1

protected EventLoopGroup createBossGroup() {
    return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}

workerGroup从配置读取

protected EventLoopGroup createWorkerGroup() {
    return NettyEventLoopFactory.eventLoopGroup(
            getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
        EVENT_LOOP_WORKER_POOL_NAME);
}
Dubbo SPI

org.apache.dubbo.remoting.Codec2

transport=org.apache.dubbo.remoting.transport.codec.TransportCodec
telnet=org.apache.dubbo.remoting.telnet.codec.TelnetCodec
exchange=org.apache.dubbo.remoting.exchange.codec.ExchangeCodec
protected static Codec2 getChannelCodec(URL url) {
    String codecName = url.getParameter(Constants.CODEC_KEY);
    if (StringUtils.isEmpty(codecName)) {
        // codec extension name must stay the same with protocol name
        codecName = url.getProtocol();
    }
    FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
    if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
        return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
        return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
            .getExtension(codecName));
    }
}

Idel时间

合理的Idle时间:Server Idle(触发断连)时间 >=Client Idle(触发心跳消息)时间 * 2

org.apache.dubbo.remoting.utils.UrlUtils

public class UrlUtils {
    public static int getIdleTimeout(URL url) {
        int heartBeat = getHeartbeat(url);
        // idleTimeout should be at least more than twice heartBeat because possible retries of client.
      	// 默认是心跳时间的3倍
        int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
      	// idle时间小于2倍的心跳时间
        if (idleTimeout < heartBeat * 2) {
            throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
        }
        return idleTimeout;
    }

    public static int getHeartbeat(URL url) {
      	// 获取心跳时间
        return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
    }
}

心跳包的设计

Data:null,Event:1

org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#userEventTriggered

降低心跳数据的大小,通过idle触发心跳,也节约了心跳包

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // send heartbeat when read idle.
    if (evt instanceof IdleStateEvent) {
        try {
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
            if (logger.isDebugEnabled()) {
                logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
            }
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setEvent(HEARTBEAT_EVENT);
            channel.send(req);
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    } else {
        super.userEventTriggered(ctx, evt);
    }
}
// org.apache.dubbo.common.constants.CommonConstants#HEARTBEAT_EVENT
String HEARTBEAT_EVENT = null;
public void setEvent(String event) {
    this.mEvent = true;
    this.mData = event;
}

用事件触发handler的移除

org.apache.dubbo.remoting.api.SslServerTlsHandler#userEventTriggered

// Ssl握手成功再移除handler
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof SslHandshakeCompletionEvent) {
        SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
        if (handshakeEvent.isSuccess()) {
            SSLSession session = ctx.pipeline().get(SslHandler.class).engine().getSession();
            logger.info("TLS negotiation succeed with session: " + session);
            // Remove after handshake success.
            ctx.pipeline().remove(this);
        } else {
            logger.error("TLS negotiation failed when trying to accept new connection.", handshakeEvent.cause());
            ctx.close();
        }
    }
    super.userEventTriggered(ctx, evt);
}

委托handler的执行

handler套用了handler,委托实现更多的功能

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }
		
  	// MessageHandle套装了心跳的handler,心跳的handler套装了Dispatcher的handler
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {

    protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);

    public MultiMessageHandler(ChannelHandler handler) {
        super(handler);
    }

    @SuppressWarnings("unchecked")
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
      	// 接收到数据后,判断是不是MultiMessage
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                try {
                  	// 挨个调用handler的received方法,实现委托的功能
                    handler.received(channel, obj);
                } catch (Throwable t) {
                    logger.error("MultiMessageHandler received fail.", t);
                    try {
                        handler.caught(channel, t);
                    } catch (Throwable t1) {
                        logger.error("MultiMessageHandler caught fail.", t1);
                    }
                }
            }
        } else {
            handler.received(channel, message);
        }
    }
}

使用代理Socks5(VS http proxy)

org.apache.dubbo.remoting.transport.netty4.NettyClient#initBootstrap

String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);
if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {
    int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
    Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
    ch.pipeline().addFirst(socks5ProxyHandler);
}

Client端支持使用Socks5代理功能

http proxy只能代理http,Socks5可以代理各种TCP,UDP也可以代理

推荐书籍

网络知识:《TCP/IP详解》、《图解TCP/IP》、《Wireshark网络分析就这么简单》

Java网络编程:《Java网络编程》、《Java TCP/IP Socket编程》

Netty相关:《Netty权威指南》、《Netty实战》、《Netty进阶之路》