netty-demo

netty-demo

起男 78 2024-03-13

netty-demo

服务端

        //创建BossGroup和WorkerGroup
        //bossGroup只处理连接请求,真正与客户端业务处理会交给workerGroup
        //这两个线程组都是无限循环
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);//指定线程数1
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();//默认线程数cpu核心数x2
        //创建服务器端的启动对象,配置启动参数
        ServerBootstrap bootstrap = new ServerBootstrap();
        //设置
        bootstrap
                .group(bossGroup,workerGroup)//设置两个线程组
                .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel作为通道实现
                .option(ChannelOption.SO_BACKLOG,128)//设置线程队列得到连接个数
                .childOption(ChannelOption.SO_KEEPALIVE,true)//设置保持活动连接状态
                .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象
                    //给pipeline设置处理器
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new NettyServerHandler());//自定义处理器
                    }
                });//给workerGroup的EventLoop对应的管道设置处理器
        System.out.println("服务器准备好了...");
        //启动服务器, 绑定端口,并设置同步
        ChannelFuture cf = bootstrap.bind(6668).sync();
        //对关闭通道进行监听
        cf.channel().closeFuture().sync();

        //关闭
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();

自定义处理器

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(这里可以读取客户端发送的消息)
     * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server ctx = "+ctx);
        //将msg转成一个ByteBuf(netty提供的)
        ByteBuf buf = (ByteBuf)msg;
        System.out.println("客户端发送消息:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("客户端地址:"+ctx.channel().remoteAddress());
    }

    /**
     * 数据读取完毕
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //将数据写入缓冲区并刷新(write+flush)
        //Unpooled.copiedBuffer进行编码
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello...", CharsetUtil.UTF_8));
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.channel().close();
    }
}

客户端

        //客户端需要一个事件循环组
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        //创建客户端启动对象
        //注意:客户端使用的不是ServerBootstrap是Bootstrap
        Bootstrap bootstrap = new Bootstrap();
        //相关设置
        bootstrap
                .group(eventExecutors)//设置线程组
                .channel(NioSocketChannel.class)//设置客户端通道实现类
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new NettyClientHandler());//加入自定义处理器
                    }
                });
        System.out.println("客户端 ok...");
        //启动客户端去连接服务器端
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
        //对关闭通道进行监听
        channelFuture.channel().closeFuture().sync();
        //关闭
        eventExecutors.shutdownGracefully();

自定义处理器

public class NettyClientHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当通道就绪会触发该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("client:"+ctx);
        ctx.writeAndFlush(Unpooled.copiedBuffer("hello 服务端", CharsetUtil.UTF_8));
    }

    /**
     * 当通道有读取事件时会触发
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("服务器回复:"+buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务器地址:"+ctx.channel().remoteAddress());
    }

    /**
     * 发生异常
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();//打印异常
        ctx.close();//关闭
    }
}