Netty(3)编写简单的应用

young 686 2022-05-22

编写网络应用程序的基本步骤

编写网络应用程序基本步骤

案例介绍及数据结构设计

案例介绍及数据结构设计

依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.76.Final</version>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.24</version>
</dependency>
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>30.1-jre</version>
</dependency>
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.8.6</version>
</dependency>

公共代码

import io.netty.buffer.ByteBuf;
import lombok.Data;
import org.example.util.JsonUtil;

import java.nio.charset.StandardCharsets;

@Data
// 消息对象
public abstract class Message<T extends MessageBody> {
  	// 消息头
    private MessageHeader messageHeader;
  	// 消息体
    private T messageBody;
		
  	// 编码操作
    public void encode(ByteBuf byteBuf) {
        byteBuf.writeInt(messageHeader.getVersion());
        byteBuf.writeLong(messageHeader.getStreamId());
        byteBuf.writeInt(messageHeader.getOpCode());
        byteBuf.writeBytes(JsonUtil.toJson(messageBody).getBytes());
    }
	
    public abstract Class<T> getMessageBodyDecodeClass(int opcode);
		
  	// 解码操作
    public void decode(ByteBuf msg) {
        int version = msg.readInt();
        long streamId = msg.readLong();
        int opCode = msg.readInt();
        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setVersion(version);
        messageHeader.setStreamId(streamId);
        messageHeader.setOpCode(opCode);
        this.messageHeader = messageHeader;
        Class<T> bodyClazz = getMessageBodyDecodeClass(opCode);
        T body = JsonUtil.fromJson(msg.toString(StandardCharsets.UTF_8), bodyClazz);
        this.messageBody = body;
    }
}
public abstract class MessageBody {
}
import lombok.Data;

@Data
// 消息头
public class MessageHeader {
    private int version = 1;
    private int opCode;
    private long streamId;
}
public abstract class Operation extends MessageBody{
    public abstract OperationResult execute();
}
public abstract class OperationResult extends MessageBody{
}
import org.example.common.auth.AuthOperation;
import org.example.common.auth.AuthOperationResult;
import org.example.common.keepalive.KeepaliveOperation;
import org.example.common.keepalive.KeepaliveOperationResult;
import org.example.common.order.OrderOperation;
import org.example.common.order.OrderOperationResult;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public enum OperationType {
    AUTH(1, AuthOperation.class, AuthOperationResult.class),
    KEEPALIVE(2, KeepaliveOperation.class, KeepaliveOperationResult.class),
    ORDER(3, OrderOperation.class, OrderOperationResult.class),
    ;

    private int opCode;
    private Class<? extends Operation> operationClazz;
    private Class<? extends OperationResult> operationResultClazz;

    OperationType(int opCode, Class<? extends Operation> operationClazz,
                  Class<? extends OperationResult> operationResultClazz) {
        this.opCode = opCode;
        this.operationClazz = operationClazz;
        this.operationResultClazz = operationResultClazz;
    }



    public static OperationType fromOpCode(int opcode) {
        Map<Integer, OperationType> typeMap =
                Arrays.stream(OperationType.values()).collect(Collectors.toMap(e -> e.getOpCode(), e -> e));
        return typeMap.get(opcode);
    }

    public static OperationType fromOperation(Operation operation) {
        Map<? extends Class<? extends Operation>, OperationType> collect =
                Arrays.stream(OperationType.values()).collect(Collectors.toMap(e -> e.getOperationClazz(), e -> e));
        return collect.get(operation.getClass());
    }

    public int getOpCode() {
        return opCode;
    }

    public Class<? extends Operation> getOperationClazz() {
        return operationClazz;
    }

    public Class<? extends OperationResult> getOperationResultClazz() {
        return operationResultClazz;
    }
}
public class RequestMessage extends Message<Operation>{
    @Override
    public Class getMessageBodyDecodeClass(int opcode) {
        return OperationType.fromOpCode(opcode).getOperationClazz();
    }

    public RequestMessage() {
    }

    public RequestMessage(Long streamId,Operation operation){
        MessageHeader messageHeader = new MessageHeader();
        messageHeader.setStreamId(streamId);
        messageHeader.setOpCode(OperationType.fromOperation(operation).getOpCode());
        this.setMessageHeader(messageHeader);
        this.setMessageBody(operation);
    }
}
public class ResponseMessage  extends Message<OperationResult>{
    @Override
    public Class getMessageBodyDecodeClass(int opcode) {
        return OperationType.fromOpCode(opcode).getOperationResultClazz();
    }
}

Operation和OperationResult

import lombok.Data;
import lombok.extern.java.Log;
import org.example.common.Operation;
import org.example.common.OperationResult;

@Data
@Log
public class AuthOperation extends Operation {
    private final String userName;
    private final String password;

    @Override
    public OperationResult execute() {
        if ("admin".equalsIgnoreCase(this.userName)){
            AuthOperationResult authOperationResult = new AuthOperationResult(true);
            return authOperationResult;
        }
        return new AuthOperationResult(false);
    }
}
import lombok.Data;
import org.example.common.OperationResult;

@Data
public class AuthOperationResult extends OperationResult {
    private final boolean passAuth;
}
import org.example.common.Operation;
import org.example.common.OperationResult;

public class KeepaliveOperation extends Operation {
    private long time;

    public KeepaliveOperation() {
        this.time = System.nanoTime();
    }

    @Override
    public OperationResult execute() {
        KeepaliveOperationResult result = new KeepaliveOperationResult(time);
        return result;
    }
}
import lombok.Data;
import org.example.common.OperationResult;

@Data
public class KeepaliveOperationResult extends OperationResult {

    private final long time;
}
import lombok.Data;
import lombok.extern.java.Log;
import org.example.common.Operation;
import org.example.common.OperationResult;

@Data
@Slf4j
public class OrderOperation extends Operation {
    private int tableId;
    private String dish;

    public OrderOperation(int tableId, String dish) {
        this.tableId = tableId;
        this.dish = dish;
    }

    @Override
    public OperationResult execute() {
        log.info("order's executing startup with orderRequest: " + this);
        // execute order logic
        log.info("order's executing complete");
        OrderOperationResult result = new OrderOperationResult(tableId,dish,true);
        return result;
    }
}
import lombok.Data;
import org.example.common.OperationResult;
@Data
public class OrderOperationResult extends OperationResult {
    private int tableId;
    private String dish;
    private boolean complete;

    public OrderOperationResult(int tableId, String dish, boolean complete) {
        this.tableId = tableId;
        this.dish = dish;
        this.complete = complete;
    }


}

服务端编解码

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

// 一级解码(请求的一级解码),解决粘包半包问题
public class OrderFrameDecoder extends LengthFieldBasedFrameDecoder {
    public OrderFrameDecoder() {
      	// 帧的最大长度:Integer.MAX_VALUE,length字段偏移量:0,length字段长度:2,长度修正:0,剥离协议头的长度:2
        super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
}
import io.netty.handler.codec.LengthFieldPrepender;

// 一级编码(响应的一级编码),设置length字段长度
public class OrderFrameEncoder extends LengthFieldPrepender {

    public OrderFrameEncoder() {
        super(2);
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.example.common.RequestMessage;

import java.util.List;
// 二级解码(请求的二级解码),从ByteBuf转为RequestMessage
public class OrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
        RequestMessage requestMessage = new RequestMessage();
        requestMessage.decode(byteBuf);

        out.add(requestMessage);
    }
}
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.example.common.ResponseMessage;

import java.util.List;
// 二级编码(响应的二级编码),从ResponseMessage转为ByteBuf
public class OrderProtocolEncoder extends MessageToMessageEncoder<ResponseMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, ResponseMessage responseMessage,
                          List<Object> out) throws Exception {
        ByteBuf buffer = ctx.alloc().buffer();
        responseMessage.encode(buffer);
        out.add(buffer);
    }
}
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.example.common.Operation;
import org.example.common.OperationResult;
import org.example.common.RequestMessage;
import org.example.common.ResponseMessage;

// order的处理逻辑,继承SimpleChannelInboundHandler是因为它会自动释放ByteBuf
public class OrderServerProcessHandler extends SimpleChannelInboundHandler<RequestMessage> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessage requestMessage) throws Exception {
      	// 获取消息体
        Operation operation = requestMessage.getMessageBody();
      	// 处理业务
        OperationResult operationResult = operation.execute();
				// 生成响应
        ResponseMessage responseMessage = new ResponseMessage();
      	// 设置响应的MessageHeader
        responseMessage.setMessageHeader(requestMessage.getMessageHeader());
      	// 设置响应的消息体
        responseMessage.setMessageBody(operationResult);
				// 写响应
        ctx.writeAndFlush(responseMessage);
    }
}

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.server.codec.OrderFrameDecoder;
import org.example.server.codec.OrderFrameEncoder;
import org.example.server.codec.OrderProtocolDecoder;
import org.example.server.codec.OrderProtocolEncoder;
import org.example.server.codec.handler.OrderServerProcessHandler;

public class Server {
    public static void main(String[] args)  {
        try {
          	// server端,所以创建ServerBootStrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
						// 设置IO模式
            serverBootstrap.channel(NioServerSocketChannel.class);
						// 设置Reactor方式
            serverBootstrap.group(new NioEventLoopGroup());
						// 设置boss的log
            serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
						// 处理逻辑在worker中执行,所以设置childHandler
            serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                  	// 获取pipeline
                    ChannelPipeline pipeline = ch.pipeline();
                    // 注意顺序
                    pipeline.addLast(new OrderFrameDecoder());
                    pipeline.addLast(new OrderFrameEncoder());
                    pipeline.addLast(new OrderProtocolEncoder());
                    pipeline.addLast(new OrderProtocolDecoder());
                    pipeline.addLast(new OrderServerProcessHandler());
                  	// 添加worker的log
                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });
          	// 绑定端口,阻塞启动
            ChannelFuture channelFuture = serverBootstrap.bind(8090).sync();
            channelFuture.channel().closeFuture().get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

服务端编解码

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
// 一级解码(响应的一级解码),解决粘包半包问题
public class ClientOrderFrameDecoder extends LengthFieldBasedFrameDecoder {
    public ClientOrderFrameDecoder() {
        super(Integer.MAX_VALUE, 0, 2, 0, 2);
    }
}
// 一级编码(请求的一级编码),设置length字段长度
public class ClientOrderFrameEncoder extends LengthFieldPrepender {

    public ClientOrderFrameEncoder() {
        super(2);
    }
}
// 二级解码(响应的二级解码),将请求的响应解码为ResponseMessage
public class ClientOrderProtocolDecoder extends MessageToMessageDecoder<ByteBuf> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
        ResponseMessage responseMessage = new ResponseMessage();
        responseMessage.decode(byteBuf);

        out.add(responseMessage);
    }
}
// 二级编码(请求的二级编码),将请求编码为ByteBuf
public class ClientOrderProtocolEncoder extends MessageToMessageEncoder<RequestMessage> {

    @Override
    protected void encode(ChannelHandlerContext ctx, RequestMessage requestMessage,
                          List<Object> out) throws Exception {
        ByteBuf buffer = ctx.alloc().buffer();
        requestMessage.encode(buffer);
        out.add(buffer);
    }
}

客户端代码

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.client.codec.ClientOrderFrameDecoder;
import org.example.client.codec.ClientOrderFrameEncoder;
import org.example.client.codec.ClientOrderProtocolDecoder;
import org.example.client.codec.ClientOrderProtocolEncoder;
import org.example.common.RequestMessage;
import org.example.common.order.OrderOperation;
import org.example.util.IdUtil;

public class Client {

    public static void main(String[] args)  {
        try {
          	// 因为是客户端,所以创建BootStrap
            Bootstrap bootstrap = new Bootstrap();
						// 设置IO模式
            bootstrap.channel(NioSocketChannel.class);
          	// 设置Reactor模式
            bootstrap.group(new NioEventLoopGroup());
						// 设置发送请求及解析响应编解码
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 注意顺序
                    pipeline.addLast(new ClientOrderFrameDecoder());
                    pipeline.addLast(new ClientOrderFrameEncoder());
                    pipeline.addLast(new ClientOrderProtocolEncoder());
                    pipeline.addLast(new ClientOrderProtocolDecoder());
                  	// BootStrap的handler方法会进行覆盖,所以将日志处理写在里面
                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });
          	// 创建连接
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);
          	// 等待连接创建完成
            channelFuture.sync();
          	// 创建请求
            RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
          	// 发送请求
            channelFuture.channel().writeAndFlush(requestMessage);
            channelFuture.channel().closeFuture().get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

异常问题

参考https://lequ7.com/guan-yu-nettynetty-ke-hu-duan-fa-song-shu-ju-shi-yi-chang-bei-yin-cang.html

初次编写时,将Message中的decode与encode方法写反了,服务端一直没有接受到消息,客户端的控制台也一直未打印出异常。

debug追踪发现,在io.netty.handler.codec.MessageToMessageEncoder#write方法中,catch (Throwable t) 捕获到了异常,并且抛出了一个EncoderException。

这个异常被io.netty.channel.AbstractChannelHandlerContext#invokeWrite0捕获

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}
public static void tryFailure(Promise<?> p, Throwable cause, InternalLogger logger) {
    if (!p.tryFailure(cause) && logger != null) {
        Throwable err = p.cause();
        if (err == null) {
            logger.warn("Failed to mark a promise as failure because it has succeeded already: {}", p, cause);
        } else if (logger.isWarnEnabled()) {
            logger.warn("Failed to mark a promise as failure because it has failed already: {}, unnotified cause: {}", new Object[]{p, ThrowableUtil.stackTraceToString(err), cause});
        }
    }

}

p.tryFailure最终实现是io.netty.util.concurrent.DefaultPromise#tryFailure

public boolean tryFailure(Throwable cause) {
    return this.setFailure0(cause);
}
private boolean setFailure0(Throwable cause) {
    return this.setValue0(new CauseHolder((Throwable)ObjectUtil.checkNotNull(cause, "cause")));
}

private boolean setValue0(Object objResult) {
    if (!RESULT_UPDATER.compareAndSet(this, (Object)null, objResult) && !RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        return false;
    } else {
        if (this.checkNotifyWaiters()) {
            this.notifyListeners();
        }

        return true;
    }
}
/**
 * Check if there are any waiters and if so notify these.
 * @return {@code true} if there are any listeners attached to the promise, {@code false} otherwise.
 */
private synchronized boolean checkNotifyWaiters() {
    if (waiters > 0) {
        notifyAll();
    }
    return listeners != null;
}

这里会判断有没有listener,也就是说,有listener的话,就会去通知listener

/**
 * One or more listeners. Can be a {@link GenericFutureListener} or a {@link DefaultFutureListeners}.
 * If {@code null}, it means either 1) no listeners were added yet or 2) all listeners were notified.
 *
 * Threading - synchronized(this). We must support adding listeners when there is no EventExecutor.
 */
private Object listeners;
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
    if (listeners == null) {
        listeners = listener;
    } else if (listeners instanceof DefaultFutureListeners) {
        ((DefaultFutureListeners) listeners).add(listener);
    } else {
        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
    }
}

addListener0方法在addListener方法中被调用

所以在Client放中添加listener

RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
ChannelFuture channelFuture1 = channelFuture.channel().writeAndFlush(requestMessage);
channelFuture1.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture channelFuture) throws Exception {
        Throwable cause = channelFuture.cause();
        if (cause!=null){
            cause.printStackTrace();
        }
    }
});

此时控制台上就打印出了EncodeException的异常

优化

import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.example.common.Operation;
import org.example.common.RequestMessage;
import org.example.util.IdUtil;

import java.util.List;
// 将Operation封装成RequestMessage
public class OperationToRequestMessageEncoder extends MessageToMessageEncoder<Operation> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Operation msg, List<Object> out) throws Exception {
        RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), msg);

        out.add(requestMessage);
    }
}
public static void main(String[] args)  {
    try {
        Bootstrap bootstrap = new Bootstrap();

        bootstrap.channel(NioSocketChannel.class);
        bootstrap.group(new NioEventLoopGroup());

        bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
            @Override
            protected void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // 注意顺序
                pipeline.addLast(new ClientOrderFrameDecoder());
                pipeline.addLast(new ClientOrderFrameEncoder());
                pipeline.addLast(new ClientOrderProtocolEncoder());
                pipeline.addLast(new ClientOrderProtocolDecoder());
              	// 添加新的处理逻辑
                pipeline.addLast(new OperationToRequestMessageEncoder());
                pipeline.addLast(new LoggingHandler(LogLevel.INFO));
            }
        });
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);
        channelFuture.sync();
        OrderOperation orderOperation = new OrderOperation(1001, "tudou");
      	// 直接写OrderOperation
        channelFuture.channel().writeAndFlush(orderOperation);
        channelFuture.channel().closeFuture().get();
    }catch (Exception e){
        e.printStackTrace();
    }

}

如果将

OrderOperation orderOperation = new OrderOperation(1001, "tudou");
channelFuture.channel().writeAndFlush(orderOperation);

改为

RequestMessage requestMessage = new RequestMessage(IdUtil.nextId(), new OrderOperation(1001, "tudou"));
channelFuture.channel().writeAndFlush(requestMessage);

发现请求依然可以发送出去

查看io.netty.handler.codec.MessageToMessageEncoder#write

acceptOutboundMessage()方法,是通过反射判断泛型的类型是否与我们传的类似是一致的,如果是一致的,才会进入handler处理逻辑

引入响应分发完善客户端

引入响应分发完善客户端

定义future

采用Netty的DefaultPromise

import io.netty.util.concurrent.DefaultPromise;
import org.example.common.OperationResult;
// 客户端拿到的响应是OperationResult
public class OperationResultFuture extends DefaultPromise<OperationResult> {
}

定义ID和future的映射

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

// 存储ID和future的关系
public class RequestPendingCenter {
		// 使用Map存储映射
    private Map<Long,OperationResultFuture> map = new ConcurrentHashMap<>();
		// 添加映射关系
    public void add(Long steamId,OperationResultFuture future){
        this.map.put(steamId,future);
    }
		// 将响应设置到future中,并移除映射关系
    public void set(Long steamId, OperationResult result){
        OperationResultFuture operationResultFuture = this.map.get(steamId);
        if (operationResultFuture!=null){
            operationResultFuture.setSuccess(result);
            this.map.remove(steamId);
        }
    }
}

将请求结果对应到映射配置

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.example.common.ResponseMessage;

// 把请求结果对应到ResponsePendingCenter
public class ResponseDispatcherHandler extends SimpleChannelInboundHandler<ResponseMessage> {

    private RequestPendingCenter requestPendingCenter;

    public ResponseDispatcherHandler(RequestPendingCenter requestPendingCenter) {
        this.requestPendingCenter = requestPendingCenter;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ResponseMessage msg) throws Exception {
        requestPendingCenter.set(msg.getMessageHeader().getStreamId(), msg.getMessageBody());
    }
}

修改客户端

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.example.client.codec.ClientOrderFrameDecoder;
import org.example.client.codec.ClientOrderFrameEncoder;
import org.example.client.codec.ClientOrderProtocolDecoder;
import org.example.client.codec.ClientOrderProtocolEncoder;
import org.example.client.codec.OperationToRequestMessageEncoder;
import org.example.client.codec.dispatcher.OperationResultFuture;
import org.example.client.codec.dispatcher.RequestPendingCenter;
import org.example.client.codec.dispatcher.ResponseDispatcherHandler;
import org.example.common.OperationResult;
import org.example.common.RequestMessage;
import org.example.common.order.OrderOperation;
import org.example.util.IdUtil;

public class ClientV2 {

    public static void main(String[] args)  {
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.group(new NioEventLoopGroup());
          	// 声明映射中心
            RequestPendingCenter requestPendingCenter = new RequestPendingCenter();
            bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    // 注意顺序
                    pipeline.addLast(new ClientOrderFrameDecoder());
                    pipeline.addLast(new ClientOrderFrameEncoder());
                    pipeline.addLast(new ClientOrderProtocolEncoder());
                    pipeline.addLast(new ClientOrderProtocolDecoder());
										// 添加响应对应到映射中心处理
                    pipeline.addLast(new ResponseDispatcherHandler(requestPendingCenter));
                    pipeline.addLast(new OperationToRequestMessageEncoder());
                    pipeline.addLast(new LoggingHandler(LogLevel.INFO));
                }
            });
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8090);

            channelFuture.sync();
						// 获取streamId
            long streamId = IdUtil.nextId();
						// 创建响应
            RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "tudou"));
						// 声明一个future
            OperationResultFuture operationResultFuture = new OperationResultFuture();
						// 将future加入到映射中心
            requestPendingCenter.add(streamId,operationResultFuture);
						// 写数据
            channelFuture.channel().writeAndFlush(requestMessage);
						// 阻塞等待响应
            OperationResult operationResult = operationResultFuture.get();
						// 打印响应
            System.out.println(operationResult);
            channelFuture.channel().closeFuture().get();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

Netty编程中易错点

  • LengthFieldBasedFrameDecoder中initialBytesToStrip未考虑设置

    会带着length属性进行解析

  • ChannelHandler顺序不正确

    4+1模式,V字形模式,执行顺序先下后上

    pipeline顺序

  • ChannelHandler该共享不共享,不该共享却共享

  • 分配ByteBuf:分配器直接使用ByteBufAllocator.DEFAULT等,而不是采用ChannelHandlerContext.alloc()

    ByteBuf可以是堆外内存,可以是堆内内存,可以来源于内存池或非内存池,是可以在ServerBootStrap指定的

    ChannelHandlerContext.alloc().buffer()是ServerBootStrap启动的时候指定的allocator

    如果用ByteBufAllocator.DEFAULT.buffer(),那么ServerBootStrap里如果切换的话,这里就会不一致

  • 未考虑ByteBuf的释放

    SimpleChannelInboundHandler#channelRead方法中,finally方法会释放ByteBuf

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        boolean release = true;
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I imsg = (I) msg;
                channelRead0(ctx, imsg);
            } else {
                release = false;
                ctx.fireChannelRead(msg);
            }
        } finally {
            if (autoRelease && release) {
                ReferenceCountUtil.release(msg);
            }
        }
    }
    

    在不满足acceptInboundMessage是,代码走到了ctx.fireChannelRead(msg),查看pipeline的最后一个,TailContext,可以看到很多方法里都调用了onUnhandledInboundMessage方法

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    

    最终也会释放ByteBuf

  • 错以为ChannelHandlerContext.write(msg)就写出数据了

    write仅仅是将信息加入到队列中,而不是发送出去

  • 乱用ChannelHandlerContext.channel().wirteAndFlush(msg)

    channel.wirteAndFlush调用的是pipeline的wirteAndFlush方法,导致整个pipeline重新走了一遍,严重的时候可能出现死循环

    此方式一般用于客户端的编写

    io.netty.channel.AbstractChannel#writeAndFlush(java.lang.Object)

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return pipeline.writeAndFlush(msg);
    }
    

    ChannelHandlerContext.wirteAndFlush是在当前pipeline的位置,寻找下一个符合条件的handler