netty-整合protobuf

netty-整合protobuf

起男 425 2024-04-02

netty-整合protobuf

依赖

        <dependency>
            <groupId>com.google.protobuf</groupId>
            <artifactId>protobuf-java</artifactId>
            <version>3.6.1</version>
        </dependency>

proto文件

  syntax = "proto3"; //版本
  option java_outer_classname = "StudentPOJO"; //生成的外部类名,同时也是文件名
  //protobuf 使用message管理数据
  message Student{ //会在StudentPOJO外部类生成一个内部类,是真正发送的pojo对象
      int32 id = 1;//Student类中有一个属性 名称为id,类型为int32(protobuf类型) 1为属性序号,不是值
      string name = 2;
}

服务端

NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
        //创建服务器端的启动对象,配置启动参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        //设置
        bootstrap
                .group(bossGroup,workerGroup)//设置两个线程组
                .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
                .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
                .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
                    //给pipeline设置处理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                            //protobuf解码器
                                .addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()))
                                .addLast(new NettyServerHandler());//自定义处理器
                    }
                });//给workerGroup的EventLoop对应的管道设置处理器
        //启动服务器, 绑定端口,并设置同步
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //对关闭通道进行监听
        cf.channel().closeFuture().sync();

        //关闭
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(这里可以读取客户端发送的消息)
     * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		//转为具体类型,也可以使用泛型的方式
        StudentPOJO.Student student = (StudentPOJO.Student) msg;
        System.out.println("服务器:id="+student.getId()+",name="+student.getName());
    }
}

客户端

        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        //创建客户端启动对象
        //注意:客户端使用的不是ServerBootstrap是Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        //相关设置
        bootstrap
                .group(eventExecutors)//设置线程组
                .channel(NioSocketChannel.class)//设置客户端通道实现类
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                            //protobuf编码器
                                .addLast(new ProtobufEncoder())
                                .addLast(new NettyClientHandler());//加入自定义处理器
                    }
                });
        //启动客户端去连接服务器端
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
        //对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
        //关闭
        eventExecutors.shutdownGracefully();

处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当通道就绪会触发该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        StudentPOJO.Student student = StudentPOJO.Student.newBuilder()
            .setId(2)
            .setName("zhangsan")
            .build();
        ctx.channel().writeAndFlush(student);
    }

}