netty-群聊系统

netty-群聊系统

起男 50 2024-03-26

netty-群聊系统

服务端

public class GroupChatServer {
    private int port;//端口

    public GroupChatServer(int port){
        this.port = port;
    }

    //处理客户端请求
    @SneakyThrows
    public void run(){
        //创建两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        ServerBootstrap bootstrap = new ServerBootstrap()
                .group(bossGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //获取pipeline
                        ChannelPipeline pipeline = ch.pipeline();
                        //向pipeline里加入一个解码器
                        pipeline.addLast("decoder",new StringDecoder());
                        //向pipeline里加入一个编码器
                        pipeline.addLast("encoder",new StringEncoder());
                        //加入自定义业务处理handler
                        pipeline.addLast(new GroupChatServerHandler());
                    }
                });
        System.out.println("服务器启动");
        ChannelFuture channelFuture = bootstrap.bind(port).sync();
        channelFuture.channel().closeFuture().sync();

        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }

    public static void main(String[] args) {
        new GroupChatServer(7000).run();
    }
}

自定义处理器

public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {

    //定义一个channel组,关联所有的channel(需要一个全局事件执行器)
    private static ChannelGroup channelGroup =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    //当连接建立
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //将channelGroup中所有的channel遍历并发送消息
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天");
        //将当前channel加入到channelGroup
        channelGroup.add(channel);
    }
    //当连接断开
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //不需要手动删除
        //channelGroup.remove(channel);
        channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天");
    }

    //channel处于活动状态
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+" 上线");
    }
    //channel处于非活动状态
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println(ctx.channel().remoteAddress()+" 离线");
    }
    //读取数据
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        Channel channel = ctx.channel();
        //遍历channelGroup,根据不同的情况,返回不同的消息
        channelGroup.forEach(ch->{
            if (ch != channel){//不是当前channel,进行转发
                ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息:"+msg);
            }else {//自己的消息,进行回显
                ch.writeAndFlush("[自己]发送了消息:"+msg);
            }
        });
    }
    //发送异常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //关闭通道
        ctx.close();
    }
}

客户端

public class GroupChatClient {

    private final String host;
    private final int port;

    GroupChatClient(String host,int port){
        this.host = host;
        this.port = port;
    }

    @SneakyThrows
    public void run(){
        EventLoopGroup eventExecutors = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap()
                .group(eventExecutors)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        //编解码器
                        pipeline.addLast("decoder",new StringDecoder());
                        pipeline.addLast("encoder",new StringEncoder());
                        //自定义handler
                        pipeline.addLast(new GroupChatClientHandler());
                    }
                });
        ChannelFuture cf = bootstrap.connect(host, port).sync();
        //得到channel
        Channel channel = cf.channel();
        System.out.println("..."+channel.localAddress()+"...");
        //创建扫描器
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String s = scanner.nextLine();
            //通过channel发送到服务器
            channel.writeAndFlush(s);
        }

        eventExecutors.shutdownGracefully();
    }

    public static void main(String[] args) {
        new GroupChatClient("127.0.0.1",7000).run();
    }
}

自定义处理器

public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg.trim());
    }
}