netty 4.1.45自定义编解码器

关注
netty 4.1.45自定义编解码器www.shan-machinery.com

文章目录 概述自定义编解码器实现机制编码器解码器服务端代码客户端代码运行结果

注:更多netty相关文章请访问博主专栏: netty专栏

概述

前面 netty 4.1.45 TCP拆包粘包原因及解决办法 介绍了使用netty本身提供的LineBasedFrameDecoder、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder三种编解码方案。不过实际工作中直接使用这几种编解码的还是比较少的。比如处于安全、性能、私有网络等。

这里介绍简单的自定义编解码器的实现,后续再介绍如何与json,PB协议等进行集成实现复杂的netty协议。

自定义编解码器实现机制

将消息分为消息头和消息体,消息头用int类型表示,当然你们也可以用其他类型标识。消息头的值是消息体的长度。 编码时,首先读取消息的长度,写入发送缓冲区,再将消息体写入; 解码时,要先读取消息的长度,然后读取该长度的消息体,当拆包或者粘包时要将数据报进行拆分或者合并。

我们发送的每一条数据的格式: 4字节的长度+实际数据内容 如:10helloworld,表示长度是10,从后读取10个字节ji是一条完整的消息。

编码器

编码器需要实现MessageToByteEncoder类,该类用途就是读取数据长度,分两次写入缓冲区,一次数据长度,一次消息体。

public class MyCustomMessageEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {// 要发送的数据// 这里如果是自定义的类型,msg即为自定义的类型,需要转为byte[]byte[] body = ((ByteBuf) msg).array();// 数据长度int dataLength = body.length;// 缓冲区先写入数据长度out.writeInt(dataLength);// 再写入数据out.writeBytes(body);}} 解码器 public class MyCustomMessageDecoder extends ByteToMessageDecoder {// 消息头:发送端写的是一个int,占用4字节。private final static int HEAD_LENGTH = 4;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {//if (in.readableBytes() System.out.println("ERROR!");ctx.close();}//读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex.// 这个配合markReaderIndex使用的。// 把readIndex重置到mark的地方if (in.readableBytes() return new String(body, 0, body.length);}}

这里需要着重注意点是ByteBuf的markReaderIndex和resetReaderIndex的配合使用。

markReaderIndex:标记一下当前读的位置resetReaderIndex:重置读取位置到标记的位置 从代码中也可以看到,如果可读的字节数小于数据长度dataLength,需要等待下一个包到来继续读取。 服务端代码

服务端最重要的是配置channelpipeline。 在这里插入图片描述 完整代码如下:

package com.example;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.ByteToMessageDecoder;import io.netty.handler.codec.DelimiterBasedFrameDecoder;import io.netty.handler.codec.LineBasedFrameDecoder;import io.netty.handler.codec.MessageToByteEncoder;import io.netty.handler.codec.string.StringDecoder;import java.text.SimpleDateFormat;import java.util.Date;import java.util.List;/** * 自定义消息解码器 */public class MyNettyServer4 {public static void main(String[] args) {//配置服务端NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workGroup)//配置主从线程组.channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)//配置一些TCP的参数.childHandler(new MyChildHandler4());//添加自定义的channel//绑定8080端口ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();//服务端监听端口关闭ChannelFuture future = channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//netty优雅停机bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}}class MyChildHandler4 extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(new MyCustomMessageEncoder()).addLast(new MyCustomMessageDecoder()).addLast(new TimeServerHandler4());}}/** * TimeServerHandler这个才是服务端真正处理请求的服务方法 */class TimeServerHandler4 extends ChannelInboundHandlerAdapter {private int count = 0;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {String body = (String) msg;SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTimeStr = "QUERY TIME ORDER".equalsIgnoreCase(body) ?format.format(new Date()) + "" : "BAD ORDER";//受到一次请求,count加1System.out.println("第 " + count++ + " 次收到客户端请求:" + body + "返回响应:" + currentTimeStr);ByteBuf resp = Unpooled.copiedBuffer(currentTimeStr.getBytes());ctx.write(resp);//将消息发送到发送缓冲区//ctx.writeAndFlush(resp);//如果这里使用writeAndFlush,则下面channelReadComplete中就不需要flush了}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) {ctx.flush();//将消息从发送缓冲区中写入socketchannel中}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}} 客户端代码

客户端代码最重要也是配置channelpipeline。

完整代码:

package com.example;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.*;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.List;import java.util.concurrent.TimeUnit;/** * 自定义消息解码器 */public class MyNettyClient4 {public static void main(String[] args) {//客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MyCustomMessageEncoder()).addLast(new MyCustomMessageDecoder()).addLast(new TimeClientHandler4());}});//异步链接服务器ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();//等等客户端链接关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅停机group.shutdownGracefully();}}}//客户端业务逻辑处理类class TimeClientHandler4 extends ChannelInboundHandlerAdapter {private int count = 0;/** * 客户端与服务器TCP链路链接成功后调用该方法 * * @param ctx */@Overridepublic void channelActive(ChannelHandlerContext ctx) {byte[] req = ("QUERY TIME ORDER").getBytes();for (int i = 0; i String body = (String) msg;System.out.println("第 " + count++ + " 次受到服务端返回:" + body);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.out.println("Unexpected exception from downstream : " + cause.getMessage());ctx.close();}} 运行结果

server端结果: 在这里插入图片描述

客户端结果: 在这里插入图片描述 结果正常。

注:更多netty相关文章请访问博主专栏: netty专栏

https://www.shan-machinery.com