注:更多netty相关文章请访问博主专栏: netty专栏
概述前面 netty 4.1.45 TCP拆包粘包原因及解决办法 介绍了使用netty本身提供的LineBasedFrameDecoder、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder三种编解码方案。不过实际工作中直接使用这几种编解码的还是比较少的。比如处于安全、性能、私有网络等。
这里介绍简单的自定义编解码器的实现,后续再介绍如何与json,PB协议等进行集成实现复杂的netty协议。
自定义编解码器实现机制将消息分为消息头和消息体,消息头用int类型表示,当然你们也可以用其他类型标识。消息头的值是消息体的长度。 编码时,首先读取消息的长度,写入发送缓冲区,再将消息体写入; 解码时,要先读取消息的长度,然后读取该长度的消息体,当拆包或者粘包时要将数据报进行拆分或者合并。
我们发送的每一条数据的格式: 4字节的长度+实际数据内容 如:10helloworld,表示长度是10,从后读取10个字节ji是一条完整的消息。
编码器编码器需要实现MessageToByteEncoder类,该类用途就是读取数据长度,分两次写入缓冲区,一次数据长度,一次消息体。
public class MyCustomMessageEncoder extends MessageToByteEncoder {@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {// 要发送的数据// 这里如果是自定义的类型,msg即为自定义的类型,需要转为byte[]byte[] body = ((ByteBuf) msg).array();// 数据长度int dataLength = body.length;// 缓冲区先写入数据长度out.writeInt(dataLength);// 再写入数据out.writeBytes(body);}} 解码器 public class MyCustomMessageDecoder extends ByteToMessageDecoder {// 消息头:发送端写的是一个int,占用4字节。private final static int HEAD_LENGTH = 4;@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {//if (in.readableBytes() System.out.println("ERROR!");ctx.close();}//读到的消息体长度如果小于我们传送过来的消息长度,则resetReaderIndex.// 这个配合markReaderIndex使用的。// 把readIndex重置到mark的地方if (in.readableBytes() return new String(body, 0, body.length);}}这里需要着重注意点是ByteBuf的markReaderIndex和resetReaderIndex的配合使用。
markReaderIndex:标记一下当前读的位置resetReaderIndex:重置读取位置到标记的位置 从代码中也可以看到,如果可读的字节数小于数据长度dataLength,需要等待下一个包到来继续读取。 服务端代码服务端最重要的是配置channelpipeline。 完整代码如下:
客户端代码最重要也是配置channelpipeline。
完整代码:
package com.example;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.*;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.List;import java.util.concurrent.TimeUnit;/** * 自定义消息解码器 */public class MyNettyClient4 {public static void main(String[] args) {//客户端NIO线程组EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new MyCustomMessageEncoder()).addLast(new MyCustomMessageDecoder()).addLast(new TimeClientHandler4());}});//异步链接服务器ChannelFuture future = bootstrap.connect("127.0.0.1", 8080).sync();//等等客户端链接关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅停机group.shutdownGracefully();}}}//客户端业务逻辑处理类class TimeClientHandler4 extends ChannelInboundHandlerAdapter {private int count = 0;/** * 客户端与服务器TCP链路链接成功后调用该方法 * * @param ctx */@Overridepublic void channelActive(ChannelHandlerContext ctx) {byte[] req = ("QUERY TIME ORDER").getBytes();for (int i = 0; i String body = (String) msg;System.out.println("第 " + count++ + " 次受到服务端返回:" + body);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.out.println("Unexpected exception from downstream : " + cause.getMessage());ctx.close();}} 运行结果server端结果:
客户端结果: 结果正常。
注:更多netty相关文章请访问博主专栏: netty专栏
https://www.shan-machinery.com
查看网友的精彩评论