问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Netty 编码器 & 解码器 正确使用姿势

创作时间:
作者:
@小白创作中心

Netty 编码器 & 解码器 正确使用姿势

引用
CSDN
1.
https://m.blog.csdn.net/happycao123/article/details/143633703

Netty框架在处理网络通信时,编码器和解码器是核心组件。本文将详细介绍如何正确使用这些组件,以及pipeline顺序的重要性。通过具体的代码示例,帮助读者深入理解Netty的编码解码机制。

通过前面文章的例子,相信读者也感受到了Netty开发核心工作在于处理读事件(解码)、写事件(编码)。

Netty的编解码器是处理网络数据编码和解码的核心组件,编解码器使得在客户端和服务器之间传输的数据可以被正确地序列化和反序列化:

  • 编码:结构化数据序列化为字节流
  • 解码:字节流反序列化为结构化数据

本文主要内容:

  • 如何正确使用编码器解码器
  • pipeline顺序的重要性

类图

解码器

对于读取消息实现,我们可以实现ChannelInboundHandler,这是比较底层接口,我们可以实现ChannelInboundHandlerAdapter(我们只需要重新channelRead方法),你可能问下面的msg究竟是什么类型?先卖一个关子。

我们业务开发更多的使用ByteToMessageDecoder,先看较完整的代码再解释

channelRead(ChannelHandlerContext ctx, Object msg)

自定义编码器例子

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new MyChannelInboundHandler0());
                    ch.pipeline().addLast( new MyMessageDecoder());
                    ch.pipeline().addLast(new MyChannelInboundHandler1());
                }
            });
    ChannelFuture f = b.bind(port).sync();
    f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}
public class MyChannelInboundHandler0 extends ChannelInboundHandlerAdapter {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       System.out.println( "MyChannelInboundHandler0 ..." + msg.getClass());
       ctx.fireChannelRead(msg);
   }
}
public class MyMessageDecoder extends ByteToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        out.add(doDecode(in));
    }
    private MyMessage doDecode(ByteBuf in){
        ByteBuf targetBuf =   in.readBytes(in.readableBytes());
        String content = targetBuf.toString(StandardCharsets.UTF_8);
        return new MyMessage(content);
    }
}
public class MyChannelInboundHandler1 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if(msg instanceof MyMessage){
           // 处理消息
           MyMessage myMessage = (MyMessage)msg;
           System.out.println(" handler MyMessage " + myMessage.getMessage() );
       }else {
           ctx.fireChannelRead(msg);
       }
    }
}

代码解读

之前文章我们一直强调pipeline顺序的重要性,底层读到数据会转成ByteBuf,所以我们第一个ChannelInboundHandler拿到的msg类型是ByteBuf

消息会经过pipeline中一系列ChannelInboundHandler处理中间会对消息进行转换

所以具体类型,要看前面是如何处理的,读取消息处理顺序和消息类型变化如下:

处理器正确处理方案

通常我们处理器都需要判断是否需要处理,当前处理器不能处理则ctx.fireChannelRead(msg)传递下去,否则后面的处理器无法处理获取到消息,比如MyChannelInboundHandler1实现逻辑。

ByteToMessageDecoder部分源码解读

ByteToMessageDecoder的channelRead核心代码如下,主体逻辑

  • 如果msg不是ByteBuf,则不处理(对应else分支)
  • 对于ByteBuf,调用decode方法,该类由子类实现,子类需要将转换后的消息放入到out集合(参考MyMessageDecoder)
  • finally块,会将out集合列表中将所有消息发出去

这就是我们业务将消息转换时需要放入到out集合。本质还是通过ctx.fireChannelRead(msg)传递下去

// 省略 catch 
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       if (msg instanceof ByteBuf) {
           CodecOutputList out = CodecOutputList.newInstance();
           try {
               first = cumulation == null;
               cumulation = cumulator.cumulate(ctx.alloc(),
                       first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
               callDecode(ctx, cumulation, out);
           }finally {
               try {
                   ... 
                   fireChannelRead(ctx, out, size);
               } finally {
                   out.recycle();
               }
           }
       } else {
           ctx.fireChannelRead(msg);
       }
   }
   /**
    * Get {@code numElements} out of the {@link List} and forward these through the pipeline.
    */
   static void fireChannelRead(ChannelHandlerContext ctx, List<Object> msgs, int numElements) {
       if (msgs instanceof CodecOutputList) {
           fireChannelRead(ctx, (CodecOutputList) msgs, numElements);
       } else {
           for (int i = 0; i < numElements; i++) {
               ctx.fireChannelRead(msgs.get(i));
           }
       }
   }

MessageToByteEncoder

将消息"编码"ByteBuf,与ByteToMessageDecoder相反,MyMessageEncoder是一个简单MessageToByteEncoder使用例子,实现它不难重点还是pipeline的责任链顺序。

public class MyMessageEncoder extends MessageToByteEncoder<MyOutMessage> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyOutMessage msg, ByteBuf out) throws Exception {
        System.out.println("encode " + msg.getMessage());
        String message = "echo " +msg.getMessage();
        out.writeBytes(message.getBytes(StandardCharsets.UTF_8));
    }
}
public class MyChannelInboundHandler1 extends ChannelInboundHandlerAdapter {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      if(msg instanceof MyMessage){
          // 处理消息
          MyMessage myMessage = (MyMessage)msg;
          System.out.println(" handler MyMessage " + myMessage.getMessage() );
          MyOutMessage myOutMessage = new MyOutMessage(myMessage.getMessage());
          ctx.writeAndFlush(myOutMessage);
           //ctx.channel().writeAndFlush(myOutMessage);
      }else {
          ctx.fireChannelRead(msg);
      }
   }
}
    ch.pipeline().addLast(new MyChannelInboundHandler0());
                            ch.pipeline().addLast( new MyMessageDecoder());
                            ch.pipeline().addLast(new MyMessageEncoder());
                            ch.pipeline().addLast(new MyChannelInboundHandler1());
                            //ch.pipeline().addLast(new MyMessageEncoder());

将前面MyChannelInboundHandler1代码稍作修改,使用ctx.writeAndFlush(myOutMessage)发送出去。

MyMessageEncoder如果定义在MyChannelInboundHandler1后面则接收不到,具体源码后面文章会解读。

再谈顺序

ctx.fireChannelRead读事件传播方向从当前节点->责任链尾部方向传播

所以前面节点不会处理到;

ctx.writeAndFlush从当前节点->责任链头部方向传播,因此定义在当前节点后面的节点不会处理到对应的

写事件。

举一反三ctx.channel().writeAndFlush(myOutMessage);结果又如何?

MessageToMessageDecoder和MessageToMessageEncoder

有时候我们需要对消息多次处理这两个类比较有用,这两个类是可选的。

通常读取事件处理顺序

写入事件处理顺序

使用时需要保证责任链上处理类类型能够衔接上,否则消息从中间某个处理器就断了。

总结

本文主要介绍了MessageToByteEncoder和ByteToMessageDecoder重要的抽象类有了这两个基础类,理论上我们可以实现

一切编码、转码工作。

当然netty内置了许多编码器和解码器,尤其在处理TCP半包、粘包问题值得我们借鉴。

另外通过本文,详细读者也看到责任链顺序的重要性,不正确的顺序会导致业务无法正确的处理消息。

限于篇幅后面文章会持续输出消息处理整个流程,以及内置编码器使用等待。

© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号