安全增强
设置高低水位线
Netty OOM的根本原因
根源:进(读速度)大于出(写速度)
表象:
- 上游发送太快:任务重
- 自己:处理慢/不发或者发的慢:处理能力有限,流量控制等原因
- 网速
- 下游处理速度慢:导致不及时读取接收Buffer数据,然后反馈到这边,发送速度降速
Netty OOM - ChannelOutboundBuffer
ChannelOutboundBuffer就相当于Netty发送数据的仓库,如果存的数据过多,就会OOM
存的对象:LinkedList存ChannelOutBoundBuffer.Entry
解决方式:判断totalPendingSize>writeBufferWaterMark.high()设置unwritable,写完之后会移除entry
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// 判断待发送的数据的size是否高于高水位线
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
Netty OOM - TrafficShapingHandler
以ChannelTrafficShapingHandler为例
存的对象:messageQueue存ChannelTrafficShapingHandler.ToSend
解决方式:判断queueSize>maxWriteSize或delay>maxWriteDelay,设置unwritable
void checkWriteSuspend(ChannelHandlerContext ctx, long delay, long queueSize) {
if (queueSize > maxWriteSize || delay > maxWriteDelay) {
setUserDefinedWritability(ctx, false);
}
}
unwritable
/**
* Returns {@code true} if and only if {@linkplain #totalPendingWriteBytes() the total number of pending bytes} did
* not exceed the write watermark of the {@link Channel} and
* no {@linkplain #setUserDefinedWritability(int, boolean) user-defined writability flag} has been set to
* {@code false}.
*/
// 如果可写,unwritable==0
public boolean isWritable() {
return unwritable == 0;
}
Netty OOM的对策
设置好参数:
- 高低水位线(默认32k到64k)
- 启动流量整形时才需要考虑
- maxWrite(默认4M)
- maxGlobalWriteSize(默认400M)
- maxWriteDelay(默认4s)
判断channel.isWirtable()
// 连接存活并且可写的时候才去写
if(ctx.channel.isActive() && ctx.channel().isWritable()){
ctx.writeAndFlush(responseMessage);
}else{
// 其他情况要么丢弃数据,要么将数据存起来,想其他方法再发送,比直接OOM要强
log.error("message dropped");
}
启用空闲检测
示例:实现一个小目标
- 服务器加上read idle check - 服务器10s接收不到channel的请求就断掉连接
- 保护自己、瘦身(及时清理空闲的连接)
- 客户端加上 write idle check + keepalive - 客户端5s不发送数据就发一个keepalive
- 避免连接被断
- 启用不频繁的keepalive
Server端Idle
创建Server端的Idle检测,IdleStateHandler不支持共享
public class ServerIdleCheckHandler extends IdleStateHandler{
public ServerIdleCheckHandler() {
// 10秒的readIdle,不检测writeIdle,不检测allIdle
super(10,0,0,TimeUnit.SECONDS);
}
}
将ServerIdelCheckHandler加入到Server的pipeline中
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
pipeline.addLast("TShandler", globalTrafficShapingHandler);
// 加入idea检测
pipeline.addLast("idleCheck",new ServerIdleCheckHandler());
pipeline.addLast("frameDecode", new OrderFrameDecoder());
pipeline.addLast(new OrderProtocolDecoder());
...
...
}
});
启动Server和Client,查看Server端的日志
可以看到日志中输出了USER_EVENT: IdleStateEvent(READER_IDLE, first)和USER_EVENT: IdleStateEvent(READER_IDLE),时间间隔10秒,说明我们设置的readIdle成功了,但是连接并没有断开。
修改ServerIdleCheckHandler,重写channelIdle方法
/**
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
*/
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Slf4j
public class ServerIdleCheckHandler extends IdleStateHandler {
public ServerIdleCheckHandler() {
super(10, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
// 处理第一次ReadIdle事件
if (evt == IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT) {
// 打印日志,说明检测到了idle,连接关闭
log.info("idle check happen, so close the connection");
// 关闭连接
ctx.close();
// 不希望这个事件再触发了,就return掉
return;
}
super.channelIdle(ctx, evt);
}
}
启动Server和Client,查看Server日志
可以看到Server端我们加入的日志已经打印出来了,再查看Client端日志
可以看到连接断开了
Client端Idle+Keepalive
创建Client端的Idle
public class ClientIdleCheckHandler extends IdleStateHandler {
public ClientIdleCheckHandler() {
super(0, 5, 0);
}
}
Keepalive就是向Server发送一条信息
创建KeepaliveHandler
@Slf4j
@ChannelHandler.Sharable
public class KeepaliveHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 如果是第一次WriteIdle事件
if (evt == IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT){
// 打印日志
log.info("write idle happen. so need to send keepalive to keep connection not closed by server");
// 创建keepalive的message
KeepaliveOperation keepaliveOperation = new KeepaliveOperation();
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), keepaliveOperation);
// 发送message
ctx.writeAndFlush(requestMessage);
}
super.userEventTriggered(ctx, evt);
}
}
将ClientIdleCheckHandler和KeepaliveHandler都加入到Client的pipeline中。
keepalive可以共享,所以可以添加@Sharable注解
Keepalive要处理编解码,所以要放在比较靠后的位置。
KeepaliveHandler keepaliveHandler = new KeepaliveHandler();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new ClientIdleCheckHandler());
pipeline.addLast(new ClientOrderFrameDecoder());
pipeline.addLast(new ClientOrderFrameEncoder());
pipeline.addLast(new ClientOrderProtocolEncoder());
pipeline.addLast(new ClientOrderProtocolDecoder());
// keepalive需要编解码,所以放在后面
pipeline.addLast(keepaliveHandler);
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
启动Server和Client,查看日志
Client:
Server:
黑白名单
Netty中的”cidrPrefix“
比如判断两台主机的网络是不是在同一个局域网内,将IP地址划分为两部分:网络位和主机位
IP地址: 11000000.10101000.00000001.00000001
子网掩码: 11111111.11111111.11111111.00000000
前24位为网络位,后8位为主机位
A/B/C…类
网络 | 格式 | 子网掩码 |
---|---|---|
A类(前8位为网络位) | network.node.node.node | 255.0.0.0 |
B类(前16位网络位) | network.network.node.node | 255.255.0.0 |
C类(前24位为网络位) | network.network.network.node | 255.255.255.0 |
这样的切分就很浪费
CIDR:无分类域间路由选择,又常被称为无分类编址,表示前多少位为网络位
子网掩码 | CIDR值 |
---|---|
255.0.0.0 | /8 |
255.128.0.0 | /9 |
255.192.0.0 | /10 |
Netty地址过滤功能源码
- 同一个IP只能有一个连接
- IP地址过滤:黑名单、白名单
IP过滤
AbstractRemoteAddressFilter
在channelRegistered()和channelActive()的时候,调用了handleNewChannel方法
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
handleNewChannel(ctx);
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!handleNewChannel(ctx)) {
throw new IllegalStateException("cannot determine to accept or reject a channel: " + ctx.channel());
} else {
ctx.fireChannelActive();
}
}
// 判断连接的远程地址是否符合需求,不符合断掉
private boolean handleNewChannel(ChannelHandlerContext ctx) throws Exception {
@SuppressWarnings("unchecked")
// 获取远程地址
T remoteAddress = (T) ctx.channel().remoteAddress();
// If the remote address is not available yet, defer the decision.
if (remoteAddress == null) {
return false;
}
// No need to keep this handler in the pipeline anymore because the decision is going to be made now.
// Also, this will prevent the subsequent events from being handled by this handler.
// 只判断一次
ctx.pipeline().remove(this);
// 判断是否接受这个地址
if (accept(ctx, remoteAddress)) {
// 当前已有channelAccepted实现返回都是{},所以什么都不做,不会对已建好的连接进行处理
channelAccepted(ctx, remoteAddress);
} else {
// 当前已有channelRejected实现返回都是null,所以执行关闭
ChannelFuture rejectedFuture = channelRejected(ctx, remoteAddress);
if (rejectedFuture != null) {
rejectedFuture.addListener(ChannelFutureListener.CLOSE);
} else {
// 关闭连接
ctx.close();
}
}
return true;
}
@ChannelHandler.Sharable
public class UniqueIpFilter extends AbstractRemoteAddressFilter<InetSocketAddress> {
// 对同一个IP只能建立一个连接
private final Set<InetAddress> connected = new ConcurrentSet<InetAddress>();
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) throws Exception {
final InetAddress remoteIp = remoteAddress.getAddress();
// 判断这个IP有没有在连接了
// 建立连接的时候会把ip加入到set中,如果没有断开连接的时候,再次创建连接,就会add失败
if (!connected.add(remoteIp)) {
return false;
} else {
// 连接关闭时,从connected中移除remote ip
ctx.channel().closeFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
connected.remove(remoteIp);
}
});
return true;
}
}
}
/**
* <p>
* This class allows one to filter new {@link Channel}s based on the
* {@link IpFilterRule}s passed to its constructor. If no rules are provided, all connections
* will be accepted.
* </p>
*
* <p>
* If you would like to explicitly take action on rejected {@link Channel}s, you should override
* {@link AbstractRemoteAddressFilter#channelRejected(ChannelHandlerContext, SocketAddress)}.
* </p>
*
* <p> Consider using {@link IpSubnetFilter} for better performance while not as
* general purpose as this filter. </p>
*/
@Sharable
public class RuleBasedIpFilter extends AbstractRemoteAddressFilter<InetSocketAddress> {
private final boolean acceptIfNotFound;
private final List<IpFilterRule> rules;
/**
* <p> Create new Instance of {@link RuleBasedIpFilter} and filter incoming connections
* based on their IP address and {@code rules} applied. </p>
*
* <p> {@code acceptIfNotFound} is set to {@code true}. </p>
*
* @param rules An array of {@link IpFilterRule} containing all rules.
*/
// 基于多个规则的
public RuleBasedIpFilter(IpFilterRule... rules) {
this(true, rules);
}
/**
* Create new Instance of {@link RuleBasedIpFilter} and filter incoming connections
* based on their IP address and {@code rules} applied.
*
* @param acceptIfNotFound If {@code true} then accept connection from IP Address if it
* doesn't match any rule.
* @param rules An array of {@link IpFilterRule} containing all rules.
*/
public RuleBasedIpFilter(boolean acceptIfNotFound, IpFilterRule... rules) {
ObjectUtil.checkNotNull(rules, "rules");
this.acceptIfNotFound = acceptIfNotFound;
this.rules = new ArrayList<IpFilterRule>(rules.length);
for (IpFilterRule rule : rules) {
if (rule != null) {
this.rules.add(rule);
}
}
}
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress remoteAddress) throws Exception {
// 遍历规则,看规则能否匹配上地址
for (IpFilterRule rule : rules) {
if (rule.matches(remoteAddress)) {
// 如果能匹配上,就判断rule的ruleType是不是为ACCEPT
return rule.ruleType() == IpFilterRuleType.ACCEPT;
}
}
return acceptIfNotFound;
}
}
IpSubnetFilterRule:用于判断ip是不是在一个局域网内
private static IpFilterRule selectFilterRule(InetAddress ipAddress, int cidrPrefix, IpFilterRuleType ruleType) {
ObjectUtil.checkNotNull(ipAddress, "ipAddress");
ObjectUtil.checkNotNull(ruleType, "ruleType");
// 判断地址是ip4还是ip6
if (ipAddress instanceof Inet4Address) {
return new Ip4SubnetFilterRule((Inet4Address) ipAddress, cidrPrefix, ruleType);
} else if (ipAddress instanceof Inet6Address) {
return new Ip6SubnetFilterRule((Inet6Address) ipAddress, cidrPrefix, ruleType);
} else {
throw new IllegalArgumentException("Only IPv4 and IPv6 addresses are supported");
}
}
private Ip4SubnetFilterRule(Inet4Address ipAddress, int cidrPrefix, IpFilterRuleType ruleType) {
if (cidrPrefix < 0 || cidrPrefix > 32) {
throw new IllegalArgumentException(String.format("IPv4 requires the subnet prefix to be in range of "
+ "[0,32]. The prefix was: %d", cidrPrefix));
}
// 根据cidrPrefix计算子网掩码
subnetMask = prefixToSubnetMask(cidrPrefix);
// ip地址 & 子网掩码 就能得到网络位
networkAddress = NetUtil.ipv4AddressToInt(ipAddress) & subnetMask;
this.ruleType = ruleType;
}
@Override
public boolean matches(InetSocketAddress remoteAddress) {
// 远程地址
final InetAddress inetAddress = remoteAddress.getAddress();
if (inetAddress instanceof Inet4Address) {
int ipAddress = NetUtil.ipv4AddressToInt((Inet4Address) inetAddress);
// 用远程地址的ip地址 & 子网掩码,计算网络位,看网络位是否相同,即是不是在同一个网段内
return (ipAddress & subnetMask) == networkAddress;
}
return false;
}
示例:使用黑名单
在Server端代码中使用RuleBasedIpFilter
// 创建过滤规则,远程IP是127.0.0.1(转换之后networkAddress为2130706433), cidrPrefix为8(子网掩码为255.0.0.0,转换之后subnetMask为-16777216),策略为拒绝,IP & 子网掩码 = 2130706432,地址描述为127.0.0.0
IpSubnetFilterRule ipSubnetFilterRule = new IpSubnetFilterRule("127.0.0.1", 8,
IpFilterRuleType.REJECT);
// RuleBasedIpFilter有@Sharable注解,支持共享
RuleBasedIpFilter ruleBasedIpFilter = new RuleBasedIpFilter(ipSubnetFilterRule);
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new LoggingHandler(LogLevel.DEBUG));
// 添加到pipeline中
pipeline.addLast("ipfilter", ruleBasedIpFilter);
pipeline.addLast("TShandler", globalTrafficShapingHandler);
...
...
}
});
运行Server和Client,可以看到Client端显示连接断开。
将ipSubnetFilterRule修改为new IpSubnetFilterRule(“127.1.0.1”, 16