Skip to content

天楚锐齿

人工智能 云计算 大数据 物联网 IT 通信 嵌入式

天楚锐齿

  • 下载
  • 物联网
  • 云计算
  • 大数据
  • 人工智能
  • Linux&Android
  • 网络
  • 通信
  • 嵌入式
  • 杂七杂八

使用Netty作为TCP的粘包分包处理

2021-12-30

平常用tcp作为消息传输的载体时,需要做消息的粘包和拆包的处理,个人建议使用SCTP协议更好(SCTP有类似UDP以消息为单位对外发送、类似TCP有消息不丢失的优点,最适合传输消息类数据)。

先看server端:

  • 建立一个server:
public void start(){
        // 用来接收进来的连接
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 用来处理已经被接收的连接,一旦bossGroup接收到连接,就会把连接信息注册到workerGroup上
        EventLoopGroup workGroup = new NioEventLoopGroup();
        // nio服务的启动类
        ServerBootstrap server = new ServerBootstrap().group(bossGroup,workGroup)
                .channel(NioServerSocketChannel.class) // 说明一个新的Channel如何接收进来的连接
                .option(ChannelOption.SO_BACKLOG, 128) // tcp最大缓存链接个数
                .childOption(ChannelOption.SO_KEEPALIVE, true) //保持连接
                .handler(new LoggingHandler(LogLevel.INFO)) // 打印日志级别
                .childHandler(new TcpServerChannelInitializer());
        try {
            ChannelFuture future = server.bind(port).sync(); // 绑定端口,开始接受链接
            future.channel().closeFuture().sync();// 等待服务端口的关闭;在这个例子中不会发生,但你可以优雅实现;关闭你的服务
        } catch (InterruptedException e) {
            Log.e(TAG,”server start fail”,e);
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
  • 再继承建立一个通道初始化类:
public class TcpServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        /*
        为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
        ByteToMessageDecoder:如果想实现自己的半包解码器,实现该类;
        MessageToMessageDecoder:一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类;
        LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包消息;
        StringDecoder:字符串解码器;
        DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包消息;
        FixedLengthFrameDecoder:报文大小固定长度,不够空格补全;
        ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包消息;
        ProtobufDecoder: Protobuf 解码器;
        LengthFieldBasedFrameDecoder:指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
        Netty 还内置了如下的编码器:
        ProtobufEncoder:Protobuf 编码器;
        MessageToByteEncoder:将 Java 对象编码成 ByteBuf;
        MessageToMessageEncoder:如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个;
        LengthFieldPrepender:LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
        */
        // 这里相当于过滤器,可以配置多个
        //处理粘包,要和客户器端发送消息对应,消息格式应该为:4字节消息内容长度+消息内容
        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                1024, // 帧的最大长度,即每个数据包最大限度
                0, // 长度字段偏移量
                4, // 长度字段所占的字节数
                0, // 长度的修正值,如果长度字段没有包含尾部所有长度,则这里为正,如果长度字段包含了长度自身和长度偏移量,则这里为负
                4) // 需要忽略的字节数,从消息帧的第一字节开始算
        );
        // 自己的逻辑Handler
        pipeline.addLast(“handler”, new TcpServerHandler());
    }
}
  • 最后继承实现一个通道处理类:
public class TcpServerHandler extends SimpleChannelInboundHandler {
    private static final String TAG = “TcpServerHandler”;
    private int counter;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Log.i(TAG, “server channelActive”);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收消息的处理
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);
        System.out.println(“—–start——\n”+ body + “\n——end——“);
        //发送消息到客户端
        String content = “receive” + ++counter;
        ByteBuf resp = Unpooled.buffer();
        resp.writeShort(content.getBytes(StandardCharsets.UTF_8).length);
        resp.writeBytes(content.getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(resp);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

再看Client端

其实client端和server的处理类似。
  • 先连接到server端:
    public void start(){
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new TcpClientChannelInitializer());
        try {
            ChannelFuture future = bootstrap.connect(address,port).sync();// 客户端开启
            future.channel().writeAndFlush(“Hello world, i’m online”);
            future.channel().closeFuture().sync();// 等待直到连接中断
        } catch (Exception e) {
            Log.e(TAG, “client start fail”,e);
        }finally {
            group.shutdownGracefully();
        }
    }
  • 然后建立通道初始化类:
public class TcpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        /*
        为了解决网络数据流的拆包粘包问题,Netty 为我们内置了如下的解码器:
        ByteToMessageDecoder:如果想实现自己的半包解码器,实现该类;
        MessageToMessageDecoder:一般作为二次解码器,当我们在 ByteToMessageDecoder 将一个 bytes 数组转换成一个 java 对象的时候,我们可能还需要将这个对象进行二次解码成其他对象,我们就可以继承这个类;
        LineBasedFrameDecoder:通过在包尾添加回车换行符 \r\n 来区分整包消息;
        StringDecoder:字符串解码器;
        DelimiterBasedFrameDecoder:特殊字符作为分隔符来区分整包消息;
        FixedLengthFrameDecoder:报文大小固定长度,不够空格补全;
        ProtoBufVarint32FrameDecoder:通过 Protobuf 解码器来区分整包消息;
        ProtobufDecoder: Protobuf 解码器;
        LengthFieldBasedFrameDecoder:指定长度来标识整包消息,通过在包头指定整包长度来约定包长。
        Netty 还内置了如下的编码器:
        ProtobufEncoder:Protobuf 编码器;
        MessageToByteEncoder:将 Java 对象编码成 ByteBuf;
        MessageToMessageEncoder:如果不想将 Java 对象编码成 ByteBuf,而是自定义类就继承这个;
        LengthFieldPrepender:LengthFieldPrepender 是一个非常实用的工具类,如果我们在发送消息的时候采用的是:消息长度字段+原始消息的形式,那么我们就可以使用 LengthFieldPrepender。这是因为 LengthFieldPrepender 可以将待发送消息的长度(二进制字节长度)写到 ByteBuf 的前两个字节。
        */
        //处理粘包,要和服务器端发送消息对应,消息格式应该为:2字节消息内容长度+消息内容
        pipeline.addLast(new LengthFieldBasedFrameDecoder(
                1024, // 帧的最大长度,即每个数据包最大限度
                0, // 长度字段偏移量
                2, // 长度字段所占的字节数
                0, // 长度的修正值,如果长度字段没有包含尾部所有长度,则这里为正,如果长度字段包含了长度自身和长度偏移量,则这里为负
                2) // 需要忽略的字节数,从消息帧的第一字节开始算
        );
        // 客户端的逻辑
        pipeline.addLast(“handler”, new TcpClientHandler());
    }
}
  • 最后实现自己通道的处理:
public class TcpClientHandler extends SimpleChannelInboundHandler {
    private static final String TAG = “TcpClientHandler”;
    private int counter;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收消息的处理
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, StandardCharsets.UTF_8);
        //System.out.println(body + ” count:” + ++counter + “—-end—-\n”);
        Log.w(TAG, body + ” count:” + ++counter + “—-end—-\n”);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        Log.i(TAG, “client channelActive”);
        //发送消息到服务端
        for (int i = 0; i < 100; i++) {
            byte[] req = (“我是一条测试消息,快来读我吧,啦啦啦” + i).getBytes();
            ByteBuf message = Unpooled.buffer(req.length);
            message.writeInt(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);// 发送客户端的请求
        }
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Log.i(TAG, “Client is close”);
    }
}
379次阅读

Post navigation

前一篇:

使用LVGL画一个表盘圆弧随指针变动的仪表

后一篇:

QT qml下DBus的使用例子

发表评论 取消回复

邮箱地址不会被公开。 必填项已用*标注

个人介绍

需要么,有事情这里找联系方式:关于天楚锐齿

=== 美女同欣赏,好酒共品尝 ===

微信扫描二维码赞赏该文章:

扫描二维码分享该文章:

分类目录

  • Linux&Android (79)
  • Uncategorized (1)
  • 下载 (28)
  • 云计算 (37)
  • 人工智能 (8)
  • 大数据 (24)
  • 嵌入式 (34)
  • 杂七杂八 (34)
  • 物联网 (59)
  • 网络 (23)
  • 通信 (21)

文章归档

近期文章

  • 使用Python渲染OpenGL的.obj和.mtl文件
  • 用LVGL图形库绘制二维码
  • Android使用Messenger和SharedMemory实现跨app的海量数据传输
  • CAN信号的c语言解析代码
  • QT qml下DBus的使用例子

近期评论

  • 硕发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • maxshu发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • Ambition发表在《使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序》
  • maxshu发表在《Android9下用ethernet 的Tether模式来做路由器功能》

阅读量

  • 使用Android的HIDL+AIDL方式编写从HAL层到APP层的程序 - 16,804次阅读
  • 卸载深信服Ingress、SecurityDesktop客户端 - 12,078次阅读
  • 车机技术之Android Automotive - 6,661次阅读
  • 车机技术之车规级Linux-Automotive Grade Linux(AGL) - 5,860次阅读
  • Linux策略路由及iptables mangle、ip rule、ip route关系及一种Network is unreachable错误 - 5,709次阅读
  • 在Android9下用ndk编译vSomeIP和CommonAPI以及使用例子 - 5,658次阅读
  • linux下的unbound DNS服务器设置详解 - 5,601次阅读
  • linux的tee命令导致ssh客户端下的shell卡住不动 - 4,998次阅读
  • 车机技术之360°全景影像(环视)系统 - 4,897次阅读
  • libwebp(处理webp图像)的安装和使用 - 4,749次阅读

功能

  • 文章RSS
  • 评论RSS

联系方式

地址
深圳市科技园

时间
周一至周五:  9:00~12:00,14:00~18:00
周六和周日:10:00~12:00

标签

android AT命令 centos Hadoop hdfs ip ipv6 kickstart linux mapreduce mini6410 modem OAuth openstack os python socket ssh uboot 内核 协议 安装 嵌入式 性能 报表 授权 操作系统 数据 数据库 月报 模型 汽车 测试 深信服 深度学习 源代码 神经网络 统计 编译 网络 脚本 虚拟机 调制解调器 车机 金融
© 2023 天楚锐齿