netty-群聊系统
服务端
public class GroupChatServer {
private int port;//端口
public GroupChatServer(int port){
this.port = port;
}
//处理客户端请求
@SneakyThrows
public void run(){
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获取pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline里加入一个解码器
pipeline.addLast("decoder",new StringDecoder());
//向pipeline里加入一个编码器
pipeline.addLast("encoder",new StringEncoder());
//加入自定义业务处理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("服务器启动");
ChannelFuture channelFuture = bootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
public static void main(String[] args) {
new GroupChatServer(7000).run();
}
}
自定义处理器
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//定义一个channel组,关联所有的channel(需要一个全局事件执行器)
private static ChannelGroup channelGroup =
new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//当连接建立
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//将channelGroup中所有的channel遍历并发送消息
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"加入聊天");
//将当前channel加入到channelGroup
channelGroup.add(channel);
}
//当连接断开
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//不需要手动删除
//channelGroup.remove(channel);
channelGroup.writeAndFlush("[客户端]"+channel.remoteAddress()+"离开聊天");
}
//channel处于活动状态
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+" 上线");
}
//channel处于非活动状态
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress()+" 离线");
}
//读取数据
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel channel = ctx.channel();
//遍历channelGroup,根据不同的情况,返回不同的消息
channelGroup.forEach(ch->{
if (ch != channel){//不是当前channel,进行转发
ch.writeAndFlush("[客户]"+channel.remoteAddress()+"发送了消息:"+msg);
}else {//自己的消息,进行回显
ch.writeAndFlush("[自己]发送了消息:"+msg);
}
});
}
//发送异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//关闭通道
ctx.close();
}
}
客户端
public class GroupChatClient {
private final String host;
private final int port;
GroupChatClient(String host,int port){
this.host = host;
this.port = port;
}
@SneakyThrows
public void run(){
EventLoopGroup eventExecutors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(eventExecutors)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//编解码器
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
//自定义handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture cf = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = cf.channel();
System.out.println("..."+channel.localAddress()+"...");
//创建扫描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()){
String s = scanner.nextLine();
//通过channel发送到服务器
channel.writeAndFlush(s);
}
eventExecutors.shutdownGracefully();
}
public static void main(String[] args) {
new GroupChatClient("127.0.0.1",7000).run();
}
}
自定义处理器
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}