Netty(7)安全增强

young 1,475 2022-05-22

安全增强

设置高低水位线

Netty OOM的根本原因

根源:进(读速度)大于出(写速度)

表象:

  • 上游发送太快:任务重
  • 自己:处理慢/不发或者发的慢:处理能力有限,流量控制等原因
  • 网速
  • 下游处理速度慢:导致不及时读取接收Buffer数据,然后反馈到这边,发送速度降速

Netty OOM - ChannelOutboundBuffer

ChannelOutboundBuffer就相当于Netty发送数据的仓库,如果存的数据过多,就会OOM

存的对象:LinkedList存ChannelOutBoundBuffer.Entry

解决方式:判断totalPendingSize>writeBufferWaterMark.high()设置unwritable,写完之后会移除entry

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
        return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
  	// 判断待发送的数据的size是否高于高水位线
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
        setUnwritable(invokeLater);
    }
}

Netty OOM - TrafficShapingHandler

以ChannelTrafficShapingHandler为例

存的对象:messageQueue存ChannelTrafficShapingHandler.ToSend

解决方式:判断queueSize>maxWriteSize或delay>maxWriteDelay,设置unwritable

void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
    if (queueSize > maxWriteSize || delay > maxWriteDelay) {
        setUserDefinedWritability(ctx, false);
    }
}

unwritable

unwritable

/**
 * Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
 * not exceed the write watermark of the {@link Channel} and
 * no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
 * {@code false}.
 */
// 如果可写,unwritable==0
public boolean isWritable() {
    return unwritable == 0;
}

Netty OOM的对策

设置好参数:

  • 高低水位线(默认32k到64k)
  • 启动流量整形时才需要考虑
    • maxWrite(默认4M)
    • maxGlobalWriteSize(默认400M)
    • maxWriteDelay(默认4s)

判断channel.isWirtable()

// 连接存活并且可写的时候才去写
if(ctx.channel.isActive() && ctx.channel().isWritable()){
  ctx.writeAndFlush(responseMessage);
}else{
  // 其他情况要么丢弃数据,要么将数据存起来,想其他方法再发送,比直接OOM要强
  log.error("message dropped");
}

启用空闲检测

示例:实现一个小目标

  • 服务器加上read idle check - 服务器10s接收不到channel的请求就断掉连接
    • 保护自己、瘦身(及时清理空闲的连接)
  • 客户端加上 write idle check + keepalive - 客户端5s不发送数据就发一个keepalive
    • 避免连接被断
    • 启用不频繁的keepalive

Server端Idle

创建Server端的Idle检测,IdleStateHandler不支持共享

public class ServerIdleCheckHandler extends IdleStateHandler{
    public ServerIdleCheckHandler() {
      	// 10秒的readIdle,不检测writeIdle,不检测allIdle
        super(10,0,0,TimeUnit.SECONDS);
    }
}

将ServerIdelCheckHandler加入到Server的pipeline中

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));

        pipeline.addLast("TShandler", globalTrafficShapingHandler);
				// 加入idea检测
        pipeline.addLast("idleCheck",new ServerIdleCheckHandler());

        pipeline.addLast("frameDecode", new OrderFrameDecoder());
        pipeline.addLast(new OrderProtocolDecoder());
      	...
        ...
       }
});

启动Server和Client,查看Server端的日志

ServerIdleCheck-1

可以看到日志中输出了USER_EVENT: IdleStateEvent(READER_IDLE, first)和USER_EVENT: IdleStateEvent(READER_IDLE),时间间隔10秒,说明我们设置的readIdle成功了,但是连接并没有断开。

修改ServerIdleCheckHandler,重写channelIdle方法

/**
 * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
 * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
 */
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    ctx.fireUserEventTriggered(evt);
}
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
    public ServerIdleCheckHandler() {
        super(10, 0, 0, TimeUnit.SECONDS);
    }

    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        // 处理第一次ReadIdle事件
        if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
            // 打印日志,说明检测到了idle,连接关闭
            log.info("idle check happen, so close the connection");
	          // 关闭连接
            ctx.close();
            // 不希望这个事件再触发了,就return掉
            return;
        }
        super.channelIdle(ctx, evt);
    }
}

启动Server和Client,查看Server日志

ServerIdleCheck-2

可以看到Server端我们加入的日志已经打印出来了,再查看Client端日志

ServerIdleCheck-3

可以看到连接断开了

Client端Idle+Keepalive

创建Client端的Idle

public class ClientIdleCheckHandler extends IdleStateHandler {
    public ClientIdleCheckHandler() {
        super(0, 5, 0);
    }
}

Keepalive就是向Server发送一条信息

创建KeepaliveHandler

@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
      	// 如果是第一次WriteIdle事件
        if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT){
          	// 打印日志
            log.info("write idle happen. so need to send keepalive to keep connection not closed by server");
          	// 创建keepalive的message
            KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
            RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), keepaliveOperation);
          	// 发送message
            ctx.writeAndFlush(requestMessage);
        }
        super.userEventTriggered(ctx, evt);
    }
}

将ClientIdleCheckHandler和KeepaliveHandler都加入到Client的pipeline中。

keepalive可以共享,所以可以添加@Sharable注解

Keepalive要处理编解码,所以要放在比较靠后的位置。

KeepaliveHandler keepaliveHandler = new KeepaliveHandler();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new ClientIdleCheckHandler());
        pipeline.addLast(new ClientOrderFrameDecoder());
        pipeline.addLast(new ClientOrderFrameEncoder());
        pipeline.addLast(new ClientOrderProtocolEncoder());
        pipeline.addLast(new ClientOrderProtocolDecoder());
        // keepalive需要编解码,所以放在后面
        pipeline.addLast(keepaliveHandler);
        pipeline.addLast(new LoggingHandler(LogLevel.INFO));
    }
});

启动Server和Client,查看日志

Client:

ClientIdleKeepalive-1
Server:
ClientIdleKeepalive-2

黑白名单

Netty中的”cidrPrefix“

比如判断两台主机的网络是不是在同一个局域网内,将IP地址划分为两部分:网络位和主机位

IP地址: 11000000.10101000.00000001.00000001

子网掩码: 11111111.11111111.11111111.00000000

前24位为网络位,后8位为主机位

A/B/C…类

网络 格式 子网掩码
A类(前8位为网络位) network.node.node.node 255.0.0.0
B类(前16位网络位) network.network.node.node 255.255.0.0
C类(前24位为网络位) network.network.network.node 255.255.255.0

这样的切分就很浪费

CIDR:无分类域间路由选择,又常被称为无分类编址,表示前多少位为网络位

子网掩码 CIDR值
255.0.0.0 /8
255.128.0.0 /9
255.192.0.0 /10

Netty地址过滤功能源码

  • 同一个IP只能有一个连接
  • IP地址过滤:黑名单、白名单

IP过滤

AbstractRemoteAddressFilter

在channelRegistered()和channelActive()的时候,调用了handleNewChannel方法

@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    handleNewChannel(ctx);
    ctx.fireChannelRegistered();
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    if (!handleNewChannel(ctx)) {
        throw new IllegalStateException("cannot determine to accept or reject a channel: " + ctx.channel());
    } else {
        ctx.fireChannelActive();
    }
}
// 判断连接的远程地址是否符合需求,不符合断掉
private boolean handleNewChannel(ChannelHandlerContext ctx) throws Exception {
    @SuppressWarnings("unchecked")
  	// 获取远程地址
    T remoteAddress = (T) ctx.channel().remoteAddress();

    // If the remote address is not available yet, defer the decision.
    if (remoteAddress == null) {
        return false;
    }

    // No need to keep this handler in the pipeline anymore because the decision is going to be made now.
    // Also, this will prevent the subsequent events from being handled by this handler.
  	// 只判断一次
    ctx.pipeline().remove(this);
  
		// 判断是否接受这个地址
    if (accept(ctx, remoteAddress)) {
      	// 当前已有channelAccepted实现返回都是{},所以什么都不做,不会对已建好的连接进行处理
        channelAccepted(ctx, remoteAddress);
    } else {
      	// 当前已有channelRejected实现返回都是null,所以执行关闭
        ChannelFuture rejectedFuture = channelRejected(ctx, remoteAddress);
        if (rejectedFuture != null) {
            rejectedFuture.addListener(ChannelFutureListener.CLOSE);
        } else {
          	// 关闭连接
            ctx.close();
        }
    }

    return true;
}
@ChannelHandler.Sharable
public class UniqueIpFilter extends AbstractRemoteAddressFilter<InetSocketAddress> {
		// 对同一个IP只能建立一个连接
    private final Set<InetAddress> connected = new ConcurrentSet<InetAddress>();

    @Override
    protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) throws Exception {
        final InetAddress remoteIp = remoteAddress.getAddress();
      	// 判断这个IP有没有在连接了
      	// 建立连接的时候会把ip加入到set中,如果没有断开连接的时候,再次创建连接,就会add失败
        if (!connected.add(remoteIp)) {
            return false;
        } else {
          	// 连接关闭时,从connected中移除remote ip
            ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    connected.remove(remoteIp);
                }
            });
            return true;
        }
    }
}
/**
 * <p>
 * This class allows one to filter new {@link Channel}s based on the
 * {@link IpFilterRule}s passed to its constructor. If no rules are provided, all connections
 * will be accepted.
 * </p>
 *
 * <p>
 * If you would like to explicitly take action on rejected {@link Channel}s, you should override
 * {@link AbstractRemoteAddressFilter#channelRejected(ChannelHandlerContext, SocketAddress)}.
 * </p>
 *
 * <p> Consider using {@link IpSubnetFilter} for better performance while not as
 * general purpose as this filter. </p>
 */
@Sharable
public class RuleBasedIpFilter extends AbstractRemoteAddressFilter<InetSocketAddress> {

    private final boolean acceptIfNotFound;
    private final List<IpFilterRule> rules;

    /**
     * <p> Create new Instance of {@link RuleBasedIpFilter} and filter incoming connections
     * based on their IP address and {@code rules} applied. </p>
     *
     * <p> {@code acceptIfNotFound} is set to {@code true}. </p>
     *
     * @param rules An array of {@link IpFilterRule} containing all rules.
     */
  	//	基于多个规则的
    public RuleBasedIpFilter(IpFilterRule... rules) {
        this(true, rules);
    }

    /**
     * Create new Instance of {@link RuleBasedIpFilter} and filter incoming connections
     * based on their IP address and {@code rules} applied.
     *
     * @param acceptIfNotFound If {@code true} then accept connection from IP Address if it
     *                         doesn't match any rule.
     * @param rules            An array of {@link IpFilterRule} containing all rules.
     */
    public RuleBasedIpFilter(boolean acceptIfNotFound, IpFilterRule... rules) {
        ObjectUtil.checkNotNull(rules, "rules");

        this.acceptIfNotFound = acceptIfNotFound;
        this.rules = new ArrayList<IpFilterRule>(rules.length);

        for (IpFilterRule rule : rules) {
            if (rule != null) {
                this.rules.add(rule);
            }
        }
    }

    @Override
    protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) throws Exception {
      	// 遍历规则,看规则能否匹配上地址
        for (IpFilterRule rule : rules) {
            if (rule.matches(remoteAddress)) {
              	// 如果能匹配上,就判断rule的ruleType是不是为ACCEPT
                return rule.ruleType() == IpFilterRuleType.ACCEPT;
            }
        }

        return acceptIfNotFound;
    }
}

IpSubnetFilterRule:用于判断ip是不是在一个局域网内

private static IpFilterRule selectFilterRule(InetAddress ipAddress, int cidrPrefix, IpFilterRuleType ruleType) {
    ObjectUtil.checkNotNull(ipAddress, "ipAddress");
    ObjectUtil.checkNotNull(ruleType, "ruleType");
		// 判断地址是ip4还是ip6
    if (ipAddress instanceof Inet4Address) {
        return new Ip4SubnetFilterRule((Inet4Address) ipAddress, cidrPrefix, ruleType);
    } else if (ipAddress instanceof Inet6Address) {
        return new Ip6SubnetFilterRule((Inet6Address) ipAddress, cidrPrefix, ruleType);
    } else {
        throw new IllegalArgumentException("Only IPv4 and IPv6 addresses are supported");
    }
}
private Ip4SubnetFilterRule(Inet4Address ipAddress, int cidrPrefix, IpFilterRuleType ruleType) {
    if (cidrPrefix < 0 || cidrPrefix > 32) {
        throw new IllegalArgumentException(String.format("IPv4 requires the subnet prefix to be in range of "
                + "[0,32]. The prefix was: %d", cidrPrefix));
    }
		// 根据cidrPrefix计算子网掩码
    subnetMask = prefixToSubnetMask(cidrPrefix);
  	// ip地址 & 子网掩码 就能得到网络位
    networkAddress = NetUtil.ipv4AddressToInt(ipAddress) & subnetMask;
    this.ruleType = ruleType;
}

@Override
public boolean matches(InetSocketAddress remoteAddress) {
  // 远程地址
  final InetAddress inetAddress = remoteAddress.getAddress();
  if (inetAddress instanceof Inet4Address) {
    int ipAddress = NetUtil.ipv4AddressToInt((Inet4Address) inetAddress);
    // 用远程地址的ip地址 & 子网掩码,计算网络位,看网络位是否相同,即是不是在同一个网段内
    return (ipAddress & subnetMask) == networkAddress;
  }
  return false;
}

示例:使用黑名单

在Server端代码中使用RuleBasedIpFilter

// 创建过滤规则,远程IP是127.0.0.1(转换之后networkAddress为2130706433),	cidrPrefix为8(子网掩码为255.0.0.0,转换之后subnetMask为-16777216),策略为拒绝,IP & 子网掩码 = 2130706432,地址描述为127.0.0.0
IpSubnetFilterRule ipSubnetFilterRule = new IpSubnetFilterRule("127.0.0.1", 8,
        IpFilterRuleType.REJECT);
// RuleBasedIpFilter有@Sharable注解,支持共享
RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);

serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 注意顺序
        pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));

				// 添加到pipeline中
        pipeline.addLast("ipfilter", ruleBasedIpFilter);


        pipeline.addLast("TShandler", globalTrafficShapingHandler);
      
      	...
        ...
      
     }
});

运行Server和Client,可以看到Client端显示连接断开。

将ipSubnetFilterRule修改为new IpSubnetFilterRule(“127.1.0.1”, 16