netty-protobuf传输多种类型

netty-protobuf传输多种类型

起男 86 2024-04-03

netty-protobuf传输多种类型

proto文件

syntax = "proto3";
option optimize_for = SPEED; //加快解析
option java_package = "com.dqn.netty"; //指定生成到哪个包下
option java_outer_classname = "MyDataInfo"; //外部类名称

//protobuf 可以使用message管理其他的message
message MyMessage{
    //定义一个枚举类型
    enum DataType {
        StudentType = 0; //在proto3要求enum的属性编号从0开始
        WorkerType = 1;
    }

    DataType data_type = 1;//用data_type来标识传的是哪一个枚举类型
    //表示每次枚举类型最多只能出现其中的一个,节省空间
    oneof dataBody{
        Student student = 2;
        Worker worker = 3;
    }
}

message Student{
    int32 id = 1; //Student类的属性
    string name = 2;
}

message Worker{
    string name = 1;
    int32 age = 2;
}

服务端

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
        //创建服务器端的启动对象,配置启动参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        //设置
        bootstrap
                .group(bossGroup,workerGroup)//设置两个线程组
                .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
                .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
                .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
                    //给pipeline设置处理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new ProtobufDecoder(MyDataInfo.MyMessage.getDefaultInstance()))
                                .addLast(new NettyServerHandler());//自定义处理器
                    }
                });//给workerGroup的EventLoop对应的管道设置处理器
        //启动服务器, 绑定端口,并设置同步
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //对关闭通道进行监听
        cf.channel().closeFuture().sync();

        //关闭
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

处理器

public class NettyServerHandler extends SimpleChannelInboundHandler<MyDataInfo.MyMessage> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyDataInfo.MyMessage msg) throws Exception {
        //根据dataType来显示不同的信息
        MyDataInfo.MyMessage.DataType datType = msg.getDataType();
        if (datType == MyDataInfo.MyMessage.DataType.StudentType){
            MyDataInfo.Student student = msg.getStudent();
            System.out.println("id="+student.getId()+" name="+student.getName());
        }else {
            MyDataInfo.Worker worker = msg.getWorker();
            System.out.println("name="+worker.getName()+" age="+worker.getAge());
        }
    }
}

客户端

        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        //创建客户端启动对象
        //注意:客户端使用的不是ServerBootstrap是Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        //相关设置
        bootstrap
                .group(eventExecutors)//设置线程组
                .channel(NioSocketChannel.class)//设置客户端通道实现类
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new ProtobufEncoder())
                                .addLast(new NettyClientHandler());//加入自定义处理器
                    }
                });
        //启动客户端去连接服务器端
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
        //对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
        //关闭
        eventExecutors.shutdownGracefully();

处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当通道就绪会触发该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //随机发送随机对象
        int random = new Random().nextInt(3);
        MyDataInfo.MyMessage myMessage = null;

        if (0 == random){
            myMessage = MyDataInfo.MyMessage.newBuilder()
                    .setDataType(MyDataInfo.MyMessage.DataType.StudentType)
                    .setStudent(MyDataInfo.Student.newBuilder()
                            .setId(3)
                            .setName("zhangsan")
                            .build())
                    .build();
        }else {
            myMessage = MyDataInfo.MyMessage.newBuilder()
                    .setDataType(MyDataInfo.MyMessage.DataType.WorkerType)
                    .setWorker(MyDataInfo.Worker.newBuilder()
                            .setAge(20)
                            .setName("lisi")
                            .build())
                    .build();
        }
        //发送
        ctx.writeAndFlush(myMessage);
    }

}