开源软件中的Netty使用
Cassandra
Cassandra是什么
https://github.com/apache/cassandra
-
Apache Cassandra是一个开源的、分布式、去中心化、弹性可扩展、高可用性、容错、一致性可调、面向列的数据库
-
诞生于2007年,于2008年开源
-
它最初由FaceBook创建,用于存储收件箱等简单格式数据。此后,由于扩展性良好,被Digg、Twitter等知名网站所采纳,称为了一种流行的分布式结构化数据库存储方案。
Cassandra数据传输
0 8 16 24 32 40
+---------+---------+---------+---------+---------+
| version | flag | stream | opcode |
+---------+---------+---------+---------+---------+
| length | data
+---------+---------+---------+---------+
flag:org.apache.cassandra.transport.Envelope.Header.Flag
opcode:org.apache.cassandra.transport.Message.Type
data获取:org.apache.cassandra.transport.CBUtil#readLongString
int length = cb.readInt();
return readString(cb, length);
Cassandra使用Netty概况
总览
org.apache.cassandra.transport.PipelineConfigurator#initializeChannel
public ChannelFuture initializeChannel(final EventLoopGroup workerGroup,
final InetSocketAddress socket,
final Connection.Factory connectionFactory)
{
ServerBootstrap bootstrap = new ServerBootstrap()
// 支持两种Channel
.channel(epoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, keepAlive)
.childOption(ChannelOption.ALLOCATOR, CBUtil.allocator)
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024));
if (workerGroup != null)
// 没有使用主从模式
bootstrap = bootstrap.group(workerGroup);
// pipeline
ChannelInitializer<Channel> initializer = initializer(connectionFactory);
bootstrap.childHandler(initializer);
// Bind and start to accept incoming connections.
logger.info("Using Netty Version: {}", Version.identify().entrySet());
logger.info("Starting listening for CQL clients on {} ({})...", socket, tlsEncryptionPolicy.description());
return bootstrap.bind(socket);
}
Pipeline
public void configureModernPipeline(ChannelHandlerContext ctx,
ClientResourceLimits.Allocator resourceAllocator,
ProtocolVersion version,
Map<String, String> options)
{
BufferPoolAllocator allocator = GlobalBufferPoolAllocator.instance;
ctx.channel().config().setOption(ChannelOption.ALLOCATOR, allocator);
// Transport level encoders/decoders
String compression = options.get(StartupMessage.COMPRESSION);
FrameDecoder frameDecoder = frameDecoder(compression, allocator);
FrameEncoder frameEncoder = frameEncoder(compression);
FrameEncoder.PayloadAllocator payloadAllocator = frameEncoder.allocator();
ChannelInboundHandlerAdapter exceptionHandler = ExceptionHandlers.postV5Handler(payloadAllocator, version);
// CQL level encoders/decoders
Message.Decoder<Message.Request> messageDecoder = messageDecoder();
Envelope.Decoder envelopeDecoder = new Envelope.Decoder();
// Any non-fatal errors caught in CQLMessageHandler propagate back to the client
// via the pipeline. Firing the exceptionCaught event on an inbound handler context
// (in this case, the initial context) will cause it to propagate to to the
// exceptionHandler provided none of the the intermediate handlers drop it
// in their exceptionCaught implementation
ChannelPipeline pipeline = ctx.channel().pipeline();
final ChannelHandlerContext firstContext = pipeline.firstContext();
CQLMessageHandler.ErrorHandler errorHandler = firstContext::fireExceptionCaught;
// Capacity tracking and resource management
int queueCapacity = DatabaseDescriptor.getNativeTransportReceiveQueueCapacityInBytes();
ClientResourceLimits.ResourceProvider resourceProvider = resourceProvider(resourceAllocator);
AbstractMessageHandler.OnHandlerClosed onClosed = handler -> resourceProvider.release();
boolean throwOnOverload = "1".equals(options.get(StartupMessage.THROW_ON_OVERLOAD));
CQLMessageHandler.MessageConsumer<Message.Request> messageConsumer = messageConsumer();
CQLMessageHandler<Message.Request> processor =
new CQLMessageHandler<>(ctx.channel(),
version,
frameDecoder,
envelopeDecoder,
messageDecoder,
messageConsumer,
payloadAllocator,
queueCapacity,
resourceProvider,
onClosed,
errorHandler,
throwOnOverload);
pipeline.remove(ENVELOPE_ENCODER); // remove old outbound cql envelope encoder
pipeline.addBefore(INITIAL_HANDLER, FRAME_DECODER, frameDecoder);
pipeline.addBefore(INITIAL_HANDLER, FRAME_ENCODER, frameEncoder);
pipeline.addBefore(INITIAL_HANDLER, MESSAGE_PROCESSOR, processor);
pipeline.replace(EXCEPTION_HANDLER, EXCEPTION_HANDLER, exceptionHandler);
pipeline.remove(INITIAL_HANDLER);
// Handles delivering event messages to registered clients
ctx.channel()
.attr(Dispatcher.EVENT_DISPATCHER)
.set(dispatcher.eventDispatcher(ctx.channel(), version, payloadAllocator));
onNegotiationComplete(pipeline);
}
Cassandra使用Netty的一些技巧
Cassandra版本:4.0.4
1 显示Netty版本信息,便于排查问题
Version.identify().entrySet();
2 配置使用启用Epoll
if (userEpoll)
workerGroup = new EpollEventLoopGroup();
else
workerGroup = new NioEventLoopGroup();
public static boolean useEpoll()
{
final boolean enableEpoll = Boolean.parseBoolean(System.getProperty("cassandra.native.epoll.enabled", "true"));
if (enableEpoll && !Epoll.isAvailable() && NativeLibrary.osType == NativeLibrary.OSType.LINUX)
logger.warn("epoll not available", Epoll.unavailabilityCause());
// 判断Epoll是否可使用
return enableEpoll && Epoll.isAvailable();
}
3 自动判断要不要启动SSL:类似io.netty.handler.ssl.OptionalSslHanlder
protected EncryptionConfig encryptionConfig()
{
final EncryptionOptions encryptionOptions = DatabaseDescriptor.getNativeProtocolEncryptionOptions();
switch (tlsEncryptionPolicy)
{
case UNENCRYPTED:
// if encryption is not enabled, no further steps are required after the initial setup
return channel -> {};
case OPTIONAL:
// If optional, install a handler which detects whether or not the client is sending
// encrypted bytes. If so, on receipt of the next bytes, replace that handler with
// an SSL Handler, otherwise just remove it and proceed with an unencrypted channel.
logger.debug("Enabling optionally encrypted CQL connections between client and server");
return channel -> {
SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
encryptionOptions.require_client_auth,
SSLFactory.SocketType.SERVER);
channel.pipeline().addFirst(SSL_HANDLER, new ByteToMessageDecoder()
{
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception
{ // 根据第一个消息的5个字节判断消息是不是支持SSL
if (byteBuf.readableBytes() < 5)
{
// To detect if SSL must be used we need to have at least 5 bytes, so return here and try again
// once more bytes a ready.
return;
}
if (SslHandler.isEncrypted(byteBuf))
{
// Connection uses SSL/TLS, replace the detection handler with a SslHandler and so use
// encryption.
SslHandler sslHandler = sslContext.newHandler(channel.alloc());
channelHandlerContext.pipeline().replace(SSL_HANDLER, SSL_HANDLER, sslHandler);
}
else
{
// Connection use no TLS/SSL encryption, just remove the detection handler and continue without
// SslHandler in the pipeline.
channelHandlerContext.pipeline().remove(SSL_HANDLER);
}
}
});
};
case ENCRYPTED:
logger.debug("Enabling encrypted CQL connections between client and server");
return channel -> {
SslContext sslContext = SSLFactory.getOrCreateSslContext(encryptionOptions,
encryptionOptions.require_client_auth,
SSLFactory.SocketType.SERVER);
channel.pipeline().addFirst(SSL_HANDLER, sslContext.newHandler(channel.alloc()));
};
default:
throw new IllegalStateException("Unrecognized TLS encryption policy: " + this.tlsEncryptionPolicy);
}
}
4 控制连接总数和单个IP连接数,区别于io.netty.handler.ipfilter.UniqueIpFilter
UniqueIpFilter是一个IP一个连接
Cassandra实现了两个控制,一个是一个IP能建多少个连接(DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp()),另一个是整个系统的最大连接数(DatabaseDescriptor.getNativeTransportMaxConcurrentConnections())
// org.apache.cassandra.transport.ConnectionLimitHandler#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception
{
final long count = counter.incrementAndGet();
long limit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnections();
// Setting the limit to -1 disables it.
if(limit < 0)
{
limit = Long.MAX_VALUE;
}
if (count > limit)
{
// The decrement will be done in channelClosed(...)
noSpamLogger.error("Exceeded maximum native connection limit of {} by using {} connections (see native_transport_max_concurrent_connections in cassandra.yaml)", limit, count);
ctx.close();
}
else
{
long perIpLimit = DatabaseDescriptor.getNativeTransportMaxConcurrentConnectionsPerIp();
if (perIpLimit > 0)
{
InetAddress address = setRemoteAddressAttribute(ctx.channel());
if (address == null)
{
ctx.close();
return;
}
AtomicLong perIpCount = connectionsPerClient.get(address);
if (perIpCount == null)
{
perIpCount = new AtomicLong(0);
AtomicLong old = connectionsPerClient.putIfAbsent(address, perIpCount);
if (old != null)
{
perIpCount = old;
}
}
if (perIpCount.incrementAndGet() > perIpLimit)
{
// The decrement will be done in channelClosed(...)
noSpamLogger.error("Exceeded maximum native connection limit per ip of {} by using {} connections (see native_transport_max_concurrent_connections_per_ip)", perIpLimit, perIpCount);
ctx.close();
return;
}
}
ctx.fireChannelActive();
}
}
5 减少flush次数(时间)org.apache.cassandra.transport.Flusher.LegacyFlusher#run
// 在次数控制的基础上,增加了时间的控制
// 当次数不满足要求,但是停顿超过10微秒的时候,也会flush掉
public void run()
{
boolean doneWork = processQueue();
runsSinceFlush++;
if (!doneWork || runsSinceFlush > 2 || processed.size() > 50)
{
flushWrittenChannels();
runsSinceFlush = 0;
}
if (doneWork)
{
runsWithNoWork = 0;
}
else
{
// either reschedule or cancel
if (++runsWithNoWork > 5)
{
scheduled.set(false);
if (isEmpty() || !scheduled.compareAndSet(false, true))
return;
}
}
eventLoop.schedule(this, 10000, TimeUnit.NANOSECONDS);
}
Dubbo
Dubbo是什么
https://dubbo.apache.org/zh/index.html
2008年诞生,一款高性能、轻量级的开源Java RPC框架,源于阿里巴巴,它提供了三大核心能力:面向接口的远程方法调用,智能容错和负载均衡,以及服务自动注册和发现
Dubbo传输数据结构
两个字节作为Magic魔术位;一位(req/res)标识是请求还是响应;1way:又去无回,不关心响应;2way:关心响应;Event:比如心跳包就是一种event;序列化类型ID,标识编解码的规则;Status:状态
RPC Request ID:和之前的steamId类似;
Data Length:说明使用的是长度封帧
Dubbo使用Netty概况
版本3.0.8
总览
org.apache.dubbo.remoting.transport.netty4.NettyServer#doOpen
/**
* Init and start netty server
*/
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 主从模式
bossGroup = createBossGroup();
workerGroup = createWorkerGroup();
final NettyServerHandler nettyServerHandler = createNettyServerHandler();
channels = nettyServerHandler.getChannels();
initServerBootstrap(nettyServerHandler);
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
protected EventLoopGroup createBossGroup() {
return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}
protected EventLoopGroup createWorkerGroup() {
return NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
EVENT_LOOP_WORKER_POOL_NAME);
}
public static EventLoopGroup eventLoopGroup(int threads, String threadFactoryName) {
// Nio和Epoll选择
ThreadFactory threadFactory = new DefaultThreadFactory(threadFactoryName, true);
return shouldEpoll() ? new EpollEventLoopGroup(threads, threadFactory) :
new NioEventLoopGroup(threads, threadFactory);
}
Pipeline
org.apache.dubbo.remoting.transport.netty4.NettyServer#initServerBootstrap
protected void initServerBootstrap(NettyServerHandler nettyServerHandler) {
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
// 参数设置
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
// SSL支持
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation", new SslServerTlsHandler(getUrl()));
}
ch.pipeline()
// 一对编解码和一个handler
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 读写都空闲时触发idle
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
}
线程模型
可配置,有两个参数dispatch和threadpool,dispatch决定什么消息用线程池,threapool决定用什么线程池
执行方式
dubbo-remoting/dubbo-remoting-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.Dispatcher
all=org.apache.dubbo.remoting.transport.dispatcher.all.AllDispatcher
direct=org.apache.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher
message=org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher
execution=org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher
connection=org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
线程池:
dubbo-common/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.threadpool.ThreadPool
fixed=org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool
cached=org.apache.dubbo.common.threadpool.support.cached.CachedThreadPool
limited=org.apache.dubbo.common.threadpool.support.limited.LimitedThreadPool
eager=org.apache.dubbo.common.threadpool.support.eager.EagerThreadPool
Dubbo使用Netty的一些技巧
配置驱动
直接取值
BossGroup指定为1
protected EventLoopGroup createBossGroup() {
return NettyEventLoopFactory.eventLoopGroup(1, EVENT_LOOP_BOSS_POOL_NAME);
}
workerGroup从配置读取
protected EventLoopGroup createWorkerGroup() {
return NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
EVENT_LOOP_WORKER_POOL_NAME);
}
Dubbo SPI
org.apache.dubbo.remoting.Codec2
transport=org.apache.dubbo.remoting.transport.codec.TransportCodec
telnet=org.apache.dubbo.remoting.telnet.codec.TelnetCodec
exchange=org.apache.dubbo.remoting.exchange.codec.ExchangeCodec
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY);
if (StringUtils.isEmpty(codecName)) {
// codec extension name must stay the same with protocol name
codecName = url.getProtocol();
}
FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
.getExtension(codecName));
}
}
Idel时间
合理的Idle时间:Server Idle(触发断连)时间 >=Client Idle(触发心跳消息)时间 * 2
org.apache.dubbo.remoting.utils.UrlUtils
public class UrlUtils {
public static int getIdleTimeout(URL url) {
int heartBeat = getHeartbeat(url);
// idleTimeout should be at least more than twice heartBeat because possible retries of client.
// 默认是心跳时间的3倍
int idleTimeout = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartBeat * 3);
// idle时间小于2倍的心跳时间
if (idleTimeout < heartBeat * 2) {
throw new IllegalStateException("idleTimeout < heartbeatInterval * 2");
}
return idleTimeout;
}
public static int getHeartbeat(URL url) {
// 获取心跳时间
return url.getParameter(Constants.HEARTBEAT_KEY, Constants.DEFAULT_HEARTBEAT);
}
}
心跳包的设计
Data:null,Event:1
org.apache.dubbo.remoting.transport.netty4.NettyClientHandler#userEventTriggered
降低心跳数据的大小,通过idle触发心跳,也节约了心跳包
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// send heartbeat when read idle.
if (evt instanceof IdleStateEvent) {
try {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
if (logger.isDebugEnabled()) {
logger.debug("IdleStateEvent triggered, send heartbeat to channel " + channel);
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setEvent(HEARTBEAT_EVENT);
channel.send(req);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
// org.apache.dubbo.common.constants.CommonConstants#HEARTBEAT_EVENT
String HEARTBEAT_EVENT = null;
public void setEvent(String event) {
this.mEvent = true;
this.mData = event;
}
用事件触发handler的移除
org.apache.dubbo.remoting.api.SslServerTlsHandler#userEventTriggered
// Ssl握手成功再移除handler
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent handshakeEvent = (SslHandshakeCompletionEvent) evt;
if (handshakeEvent.isSuccess()) {
SSLSession session = ctx.pipeline().get(SslHandler.class).engine().getSession();
logger.info("TLS negotiation succeed with session: " + session);
// Remove after handshake success.
ctx.pipeline().remove(this);
} else {
logger.error("TLS negotiation failed when trying to accept new connection.", handshakeEvent.cause());
ctx.close();
}
}
super.userEventTriggered(ctx, evt);
}
委托handler的执行
handler套用了handler,委托实现更多的功能
public class ChannelHandlers {
private static ChannelHandlers INSTANCE = new ChannelHandlers();
protected ChannelHandlers() {
}
public static ChannelHandler wrap(ChannelHandler handler, URL url) {
return ChannelHandlers.getInstance().wrapInternal(handler, url);
}
protected static ChannelHandlers getInstance() {
return INSTANCE;
}
static void setTestingChannelHandlers(ChannelHandlers instance) {
INSTANCE = instance;
}
// MessageHandle套装了心跳的handler,心跳的handler套装了Dispatcher的handler
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
return new MultiMessageHandler(new HeartbeatHandler(url.getOrDefaultFrameworkModel().getExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
}
public class MultiMessageHandler extends AbstractChannelHandlerDelegate {
protected static final Logger logger = LoggerFactory.getLogger(MultiMessageHandler.class);
public MultiMessageHandler(ChannelHandler handler) {
super(handler);
}
@SuppressWarnings("unchecked")
@Override
public void received(Channel channel, Object message) throws RemotingException {
// 接收到数据后,判断是不是MultiMessage
if (message instanceof MultiMessage) {
MultiMessage list = (MultiMessage) message;
for (Object obj : list) {
try {
// 挨个调用handler的received方法,实现委托的功能
handler.received(channel, obj);
} catch (Throwable t) {
logger.error("MultiMessageHandler received fail.", t);
try {
handler.caught(channel, t);
} catch (Throwable t1) {
logger.error("MultiMessageHandler caught fail.", t1);
}
}
}
} else {
handler.received(channel, message);
}
}
}
使用代理Socks5(VS http proxy)
org.apache.dubbo.remoting.transport.netty4.NettyClient#initBootstrap
String socksProxyHost = ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_HOST);
if(socksProxyHost != null && !isFilteredAddress(getUrl().getHost())) {
int socksProxyPort = Integer.parseInt(ConfigurationUtils.getProperty(getUrl().getOrDefaultApplicationModel(), SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));
Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));
ch.pipeline().addFirst(socks5ProxyHandler);
}
Client端支持使用Socks5代理功能
http proxy只能代理http,Socks5可以代理各种TCP,UDP也可以代理
推荐书籍
网络知识:《TCP/IP详解》、《图解TCP/IP》、《Wireshark网络分析就这么简单》
Java网络编程:《Java网络编程》、《Java TCP/IP Socket编程》
Netty相关:《Netty权威指南》、《Netty实战》、《Netty进阶之路》