netty-整合protobuf
依赖
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.6.1</version>
</dependency>
proto文件
syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO"; //生成的外部类名,同时也是文件名
//protobuf 使用message管理数据
message Student{ //会在StudentPOJO外部类生成一个内部类,是真正发送的pojo对象
int32 id = 1;//Student类中有一个属性 名称为id,类型为int32(protobuf类型) 1为属性序号,不是值
string name = 2;
}
服务端
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
//创建服务器端的启动对象,配置启动参数
ServerBootstrap bootstrap = new ServerBootstrap();
//设置
bootstrap
.group(bossGroup,workerGroup)//设置两个线程组
.channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
.option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
//给pipeline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
//protobuf解码器
.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()))
.addLast(new NettyServerHandler());//自定义处理器
}
});//给workerGroup的EventLoop对应的管道设置处理器
//启动服务器, 绑定端口,并设置同步
ChannelFuture cf = bootstrap.bind(6668).sync();
//对关闭通道进行监听
cf.channel().closeFuture().sync();
//关闭
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
处理器
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(这里可以读取客户端发送的消息)
* @param ctx 上下文对象,含有管道pipeline,通道channel,地址
* @param msg 客户端发送的数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//转为具体类型,也可以使用泛型的方式
StudentPOJO.Student student = (StudentPOJO.Student) msg;
System.out.println("服务器:id="+student.getId()+",name="+student.getName());
}
}
客户端
NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
//创建客户端启动对象
//注意:客户端使用的不是ServerBootstrap是Bootstrap
Bootstrap bootstrap = new Bootstrap();
//相关设置
bootstrap
.group(eventExecutors)//设置线程组
.channel(NioSocketChannel.class)//设置客户端通道实现类
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
//protobuf编码器
.addLast(new ProtobufEncoder())
.addLast(new NettyClientHandler());//加入自定义处理器
}
});
//启动客户端去连接服务器端
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
//关闭
eventExecutors.shutdownGracefully();
处理器
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 当通道就绪会触发该方法
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
StudentPOJO.Student student = StudentPOJO.Student.newBuilder()
.setId(2)
.setName("zhangsan")
.build();
ctx.channel().writeAndFlush(student);
}
}