编写网络应用程序的基本步骤
案例介绍及数据结构设计
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.76.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>30.1-jre</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
公共代码
import io.netty.buffer.ByteBuf;
import lombok.Data;
import org.example.util.JsonUtil;
import java.nio.charset.StandardCharsets;
@Data
// 消息对象
public abstract class Message<T extends MessageBody> {
// 消息头
private MessageHeader messageHeader;
// 消息体
private T messageBody;
// 编码操作
public void encode(ByteBuf byteBuf) {
byteBuf.writeInt(messageHeader.getVersion());
byteBuf.writeLong(messageHeader.getStreamId());
byteBuf.writeInt(messageHeader.getOpCode());
byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
}
public abstract Class<T> getMessageBodyDecodeClass(int opcode);
// 解码操作
public void decode(ByteBuf msg) {
int version = msg.readInt();
long streamId = msg.readLong();
int opCode = msg.readInt();
MessageHeader messageHeader = new MessageHeader();
messageHeader.setVersion(version);
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(opCode);
this.messageHeader = messageHeader;
Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
T body = JsonUtil.fromJson(msg.toString(StandardCharsets.UTF_8), bodyClazz);
this.messageBody = body;
}
}
public abstract class MessageBody {
}
import lombok.Data;
@Data
// 消息头
public class MessageHeader {
private int version = 1;
private int opCode;
private long streamId;
}
public abstract class Operation extends MessageBody{
public abstract OperationResult execute();
}
public abstract class OperationResult extends MessageBody{
}
import org.example.common.auth.AuthOperation;
import org.example.common.auth.AuthOperationResult;
import org.example.common.keepalive.KeepaliveOperation;
import org.example.common.keepalive.KeepaliveOperationResult;
import org.example.common.order.OrderOperation;
import org.example.common.order.OrderOperationResult;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
public enum OperationType {
AUTH(1, AuthOperation.class, AuthOperationResult.class),
KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
ORDER(3, OrderOperation.class, OrderOperationResult.class),
;
private int opCode;
private Class<? extends Operation> operationClazz;
private Class<? extends OperationResult> operationResultClazz;
OperationType(int opCode, Class<? extends Operation> operationClazz,
Class<? extends OperationResult> operationResultClazz) {
this.opCode = opCode;
this.operationClazz = operationClazz;
this.operationResultClazz = operationResultClazz;
}
public static OperationType fromOpCode(int opcode) {
Map<Integer, OperationType> typeMap =
Arrays.stream(OperationType.values()).collect(Collectors.toMap(e -> e.getOpCode(), e -> e));
return typeMap.get(opcode);
}
public static OperationType fromOperation(Operation operation) {
Map<? extends Class<? extends Operation>, OperationType> collect =
Arrays.stream(OperationType.values()).collect(Collectors.toMap(e -> e.getOperationClazz(), e -> e));
return collect.get(operation.getClass());
}
public int getOpCode() {
return opCode;
}
public Class<? extends Operation> getOperationClazz() {
return operationClazz;
}
public Class<? extends OperationResult> getOperationResultClazz() {
return operationResultClazz;
}
}
public class RequestMessage extends Message<Operation>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
return OperationType.fromOpCode(opcode).getOperationClazz();
}
public RequestMessage() {
}
public RequestMessage(Long streamId,Operation operation){
MessageHeader messageHeader = new MessageHeader();
messageHeader.setStreamId(streamId);
messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
this.setMessageHeader(messageHeader);
this.setMessageBody(operation);
}
}
public class ResponseMessage extends Message<OperationResult>{
@Override
public Class getMessageBodyDecodeClass(int opcode) {
return OperationType.fromOpCode(opcode).getOperationResultClazz();
}
}
Operation和OperationResult
import lombok.Data;
import lombok.extern.java.Log;
import org.example.common.Operation;
import org.example.common.OperationResult;
@Data
@Log
public class AuthOperation extends Operation {
private final String userName;
private final String password;
@Override
public OperationResult execute() {
if ("admin".equalsIgnoreCase(this.userName)){
AuthOperationResult authOperationResult = new AuthOperationResult(true);
return authOperationResult;
}
return new AuthOperationResult(false);
}
}
import lombok.Data;
import org.example.common.OperationResult;
@Data
public class AuthOperationResult extends OperationResult {
private final boolean passAuth;
}
import org.example.common.Operation;
import org.example.common.OperationResult;
public class KeepaliveOperation extends Operation {
private long time;
public KeepaliveOperation() {
this.time = System.nanoTime();
}
@Override
public OperationResult execute() {
KeepaliveOperationResult result = new KeepaliveOperationResult(time);
return result;
}
}
import lombok.Data;
import org.example.common.OperationResult;
@Data
public class KeepaliveOperationResult extends OperationResult {
private final long time;
}
import lombok.Data;
import lombok.extern.java.Log;
import org.example.common.Operation;
import org.example.common.OperationResult;
@Data
@Slf4j
public class OrderOperation extends Operation {
private int tableId;
private String dish;
public OrderOperation(int tableId, String dish) {
this.tableId = tableId;
this.dish = dish;
}
@Override
public OperationResult execute() {
log.info("order's executing startup with orderRequest: " + this);
// execute order logic
log.info("order's executing complete");
OrderOperationResult result = new OrderOperationResult(tableId,dish,true);
return result;
}
}
import lombok.Data;
import org.example.common.OperationResult;
@Data
public class OrderOperationResult extends OperationResult {
private int tableId;
private String dish;
private boolean complete;
public OrderOperationResult(int tableId, String dish, boolean complete) {
this.tableId = tableId;
this.dish = dish;
this.complete = complete;
}
}
服务端编解码
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 一级解码(请求的一级解码),解决粘包半包问题
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public OrderFrameDecoder() {
// 帧的最大长度:Integer.MAX_VALUE,length字段偏移量:0,length字段长度:2,长度修正:0,剥离协议头的长度:2
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}
import io.netty.handler.codec.LengthFieldPrepender;
// 一级编码(响应的一级编码),设置length字段长度
public class OrderFrameEncoder extends LengthFieldPrepender {
public OrderFrameEncoder() {
super(2);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.example.common.RequestMessage;
import java.util.List;
// 二级解码(请求的二级解码),从ByteBuf转为RequestMessage
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage();
requestMessage.decode(byteBuf);
out.add(requestMessage);
}
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.example.common.ResponseMessage;
import java.util.List;
// 二级编码(响应的二级编码),从ResponseMessage转为ByteBuf
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage,
List<Object> out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
responseMessage.encode(buffer);
out.add(buffer);
}
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.example.common.Operation;
import org.example.common.OperationResult;
import org.example.common.RequestMessage;
import org.example.common.ResponseMessage;
// order的处理逻辑,继承SimpleChannelInboundHandler是因为它会自动释放ByteBuf
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
// 获取消息体
Operation operation = requestMessage.getMessageBody();
// 处理业务
OperationResult operationResult = operation.execute();
// 生成响应
ResponseMessage responseMessage = new ResponseMessage();
// 设置响应的MessageHeader
responseMessage.setMessageHeader(requestMessage.getMessageHeader());
// 设置响应的消息体
responseMessage.setMessageBody(operationResult);
// 写响应
ctx.writeAndFlush(responseMessage);
}
}
服务端代码
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.server.codec.OrderFrameDecoder;
import org.example.server.codec.OrderFrameEncoder;
import org.example.server.codec.OrderProtocolDecoder;
import org.example.server.codec.OrderProtocolEncoder;
import org.example.server.codec.handler.OrderServerProcessHandler;
public class Server {
public static void main(String[] args) {
try {
// server端,所以创建ServerBootStrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 设置IO模式
serverBootstrap.channel(NioServerSocketChannel.class);
// 设置Reactor方式
serverBootstrap.group(new NioEventLoopGroup());
// 设置boss的log
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
// 处理逻辑在worker中执行,所以设置childHandler
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
// 获取pipeline
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new OrderFrameDecoder());
pipeline.addLast(new OrderFrameEncoder());
pipeline.addLast(new OrderProtocolEncoder());
pipeline.addLast(new OrderProtocolDecoder());
pipeline.addLast(new OrderServerProcessHandler());
// 添加worker的log
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
// 绑定端口,阻塞启动
ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
channelFuture.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}
}
}
服务端编解码
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 一级解码(响应的一级解码),解决粘包半包问题
public class ClientOrderFrameDecoder extends LengthFieldBasedFrameDecoder {
public ClientOrderFrameDecoder() {
super(Integer.MAX_VALUE, 0, 2, 0, 2);
}
}
// 一级编码(请求的一级编码),设置length字段长度
public class ClientOrderFrameEncoder extends LengthFieldPrepender {
public ClientOrderFrameEncoder() {
super(2);
}
}
// 二级解码(响应的二级解码),将请求的响应解码为ResponseMessage
public class ClientOrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
ResponseMessage responseMessage = new ResponseMessage();
responseMessage.decode(byteBuf);
out.add(responseMessage);
}
}
// 二级编码(请求的二级编码),将请求编码为ByteBuf
public class ClientOrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage,
List<Object> out) throws Exception {
ByteBuf buffer = ctx.alloc().buffer();
requestMessage.encode(buffer);
out.add(buffer);
}
}
客户端代码
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.client.codec.ClientOrderFrameDecoder;
import org.example.client.codec.ClientOrderFrameEncoder;
import org.example.client.codec.ClientOrderProtocolDecoder;
import org.example.client.codec.ClientOrderProtocolEncoder;
import org.example.common.RequestMessage;
import org.example.common.order.OrderOperation;
import org.example.util.IdUtil;
public class Client {
public static void main(String[] args) {
try {
// 因为是客户端,所以创建BootStrap
Bootstrap bootstrap = new Bootstrap();
// 设置IO模式
bootstrap.channel(NioSocketChannel.class);
// 设置Reactor模式
bootstrap.group(new NioEventLoopGroup());
// 设置发送请求及解析响应编解码
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new ClientOrderFrameDecoder());
pipeline.addLast(new ClientOrderFrameEncoder());
pipeline.addLast(new ClientOrderProtocolEncoder());
pipeline.addLast(new ClientOrderProtocolDecoder());
// BootStrap的handler方法会进行覆盖,所以将日志处理写在里面
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
// 创建连接
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);
// 等待连接创建完成
channelFuture.sync();
// 创建请求
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
// 发送请求
channelFuture.channel().writeAndFlush(requestMessage);
channelFuture.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}
}
}
异常问题
参考https://lequ7.com/guan-yu-nettynetty-ke-hu-duan-fa-song-shu-ju-shi-yi-chang-bei-yin-cang.html
初次编写时,将Message中的decode与encode方法写反了,服务端一直没有接受到消息,客户端的控制台也一直未打印出异常。
debug追踪发现,在io.netty.handler.codec.MessageToMessageEncoder#write方法中,catch (Throwable t) 捕获到了异常,并且抛出了一个EncoderException。
这个异常被io.netty.channel.AbstractChannelHandlerContext#invokeWrite0捕获
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {
if (!p.tryFailure(cause) && logger != null) {
Throwable err = p.cause();
if (err == null) {
logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
} else if (logger.isWarnEnabled()) {
logger.warn("Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", new Object[]{p, ThrowableUtil.stackTraceToString(err), cause});
}
}
}
p.tryFailure最终实现是io.netty.util.concurrent.DefaultPromise#tryFailure
public boolean tryFailure(Throwable cause) {
return this.setFailure0(cause);
}
private boolean setFailure0(Throwable cause) {
return this.setValue0(new CauseHolder((Throwable)ObjectUtil.checkNotNull(cause, "cause")));
}
private boolean setValue0(Object objResult) {
if (!RESULT_UPDATER.compareAndSet(this, (Object)null, objResult) && !RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
return false;
} else {
if (this.checkNotifyWaiters()) {
this.notifyListeners();
}
return true;
}
}
/**
* Check if there are any waiters and if so notify these.
* @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
*/
private synchronized boolean checkNotifyWaiters() {
if (waiters > 0) {
notifyAll();
}
return listeners != null;
}
这里会判断有没有listener,也就是说,有listener的话,就会去通知listener
/**
* One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
* If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
*
* Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
*/
private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
addListener0方法在addListener方法中被调用
所以在Client放中添加listener
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
ChannelFuture channelFuture1 = channelFuture.channel().writeAndFlush(requestMessage);
channelFuture1.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Throwable cause = channelFuture.cause();
if (cause!=null){
cause.printStackTrace();
}
}
});
此时控制台上就打印出了EncodeException的异常
优化
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.example.common.Operation;
import org.example.common.RequestMessage;
import org.example.util.IdUtil;
import java.util.List;
// 将Operation封装成RequestMessage
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder<Operation> {
@Override
protected void encode(ChannelHandlerContext ctx, Operation msg, List<Object> out) throws Exception {
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), msg);
out.add(requestMessage);
}
}
public static void main(String[] args) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(new NioEventLoopGroup());
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new ClientOrderFrameDecoder());
pipeline.addLast(new ClientOrderFrameEncoder());
pipeline.addLast(new ClientOrderProtocolEncoder());
pipeline.addLast(new ClientOrderProtocolDecoder());
// 添加新的处理逻辑
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);
channelFuture.sync();
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
// 直接写OrderOperation
channelFuture.channel().writeAndFlush(orderOperation);
channelFuture.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}
}
如果将
OrderOperation orderOperation = new OrderOperation(1001, "tudou");
channelFuture.channel().writeAndFlush(orderOperation);
改为
RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);
发现请求依然可以发送出去
查看io.netty.handler.codec.MessageToMessageEncoder#write
acceptOutboundMessage()方法,是通过反射判断泛型的类型是否与我们传的类似是一致的,如果是一致的,才会进入handler处理逻辑
引入响应分发完善客户端
定义future
采用Netty的DefaultPromise
import io.netty.util.concurrent.DefaultPromise;
import org.example.common.OperationResult;
// 客户端拿到的响应是OperationResult
public class OperationResultFuture extends DefaultPromise<OperationResult> {
}
定义ID和future的映射
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
// 存储ID和future的关系
public class RequestPendingCenter {
// 使用Map存储映射
private Map<Long,OperationResultFuture> map = new ConcurrentHashMap<>();
// 添加映射关系
public void add(Long steamId,OperationResultFuture future){
this.map.put(steamId,future);
}
// 将响应设置到future中,并移除映射关系
public void set(Long steamId, OperationResult result){
OperationResultFuture operationResultFuture = this.map.get(steamId);
if (operationResultFuture!=null){
operationResultFuture.setSuccess(result);
this.map.remove(steamId);
}
}
}
将请求结果对应到映射配置
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.example.common.ResponseMessage;
// 把请求结果对应到ResponsePendingCenter
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {
private RequestPendingCenter requestPendingCenter;
public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
this.requestPendingCenter = requestPendingCenter;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) throws Exception {
requestPendingCenter.set(msg.getMessageHeader().getStreamId(), msg.getMessageBody());
}
}
修改客户端
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.client.codec.ClientOrderFrameDecoder;
import org.example.client.codec.ClientOrderFrameEncoder;
import org.example.client.codec.ClientOrderProtocolDecoder;
import org.example.client.codec.ClientOrderProtocolEncoder;
import org.example.client.codec.OperationToRequestMessageEncoder;
import org.example.client.codec.dispatcher.OperationResultFuture;
import org.example.client.codec.dispatcher.RequestPendingCenter;
import org.example.client.codec.dispatcher.ResponseDispatcherHandler;
import org.example.common.OperationResult;
import org.example.common.RequestMessage;
import org.example.common.order.OrderOperation;
import org.example.util.IdUtil;
public class ClientV2 {
public static void main(String[] args) {
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(new NioEventLoopGroup());
// 声明映射中心
RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 注意顺序
pipeline.addLast(new ClientOrderFrameDecoder());
pipeline.addLast(new ClientOrderFrameEncoder());
pipeline.addLast(new ClientOrderProtocolEncoder());
pipeline.addLast(new ClientOrderProtocolDecoder());
// 添加响应对应到映射中心处理
pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
pipeline.addLast(new OperationToRequestMessageEncoder());
pipeline.addLast(new LoggingHandler(LogLevel.INFO));
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);
channelFuture.sync();
// 获取streamId
long streamId = IdUtil.nextId();
// 创建响应
RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "tudou"));
// 声明一个future
OperationResultFuture operationResultFuture = new OperationResultFuture();
// 将future加入到映射中心
requestPendingCenter.add(streamId,operationResultFuture);
// 写数据
channelFuture.channel().writeAndFlush(requestMessage);
// 阻塞等待响应
OperationResult operationResult = operationResultFuture.get();
// 打印响应
System.out.println(operationResult);
channelFuture.channel().closeFuture().get();
}catch (Exception e){
e.printStackTrace();
}
}
}
Netty编程中易错点
-
LengthFieldBasedFrameDecoder中initialBytesToStrip未考虑设置
会带着length属性进行解析
-
ChannelHandler顺序不正确
4+1模式,V字形模式,执行顺序先下后上
-
ChannelHandler该共享不共享,不该共享却共享
-
分配ByteBuf:分配器直接使用ByteBufAllocator.DEFAULT等,而不是采用ChannelHandlerContext.alloc()
ByteBuf可以是堆外内存,可以是堆内内存,可以来源于内存池或非内存池,是可以在ServerBootStrap指定的
ChannelHandlerContext.alloc().buffer()是ServerBootStrap启动的时候指定的allocator
如果用ByteBufAllocator.DEFAULT.buffer(),那么ServerBootStrap里如果切换的话,这里就会不一致
-
未考虑ByteBuf的释放
SimpleChannelInboundHandler#channelRead方法中,finally方法会释放ByteBuf
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } }
在不满足acceptInboundMessage是,代码走到了ctx.fireChannelRead(msg),查看pipeline的最后一个,TailContext,可以看到很多方法里都调用了onUnhandledInboundMessage方法
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
最终也会释放ByteBuf
-
错以为ChannelHandlerContext.write(msg)就写出数据了
write仅仅是将信息加入到队列中,而不是发送出去
-
乱用ChannelHandlerContext.channel().wirteAndFlush(msg)
channel.wirteAndFlush调用的是pipeline的wirteAndFlush方法,导致整个pipeline重新走了一遍,严重的时候可能出现死循环
此方式一般用于客户端的编写
io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object)
@Override public ChannelFuture writeAndFlush(Object msg) { return pipeline.writeAndFlush(msg); }
ChannelHandlerContext.wirteAndFlush是在当前pipeline的位置,寻找下一个符合条件的handler