netty-任务队列

netty-任务队列

起男 52 2024-03-14

netty-任务队列

使用场景

  • 用户程序自定义的普通任务
  • 用户自定义定时任务
  • 非当前reactor线程调用channel的各种方法

用户程序自定义的普通任务

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 读取数据事件(这里可以读取客户端发送的消息)
     * @param ctx 上下文对象,含有管道pipeline,通道channel,地址
     * @param msg 客户端发送的数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //向taskQueue中添加一个任务
        ctx.channel().eventLoop().execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                //此处有一个耗时任务
                Thread.sleep(10*1000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,abc",CharsetUtil.UTF_8));
            }
        });
        //再向taskQueue中添加一个任务
        //和上一个是在同一个线程中串行执行的,第二个回应会在30s之后返回
        ctx.channel().eventLoop().execute(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                //此处有一个耗时任务
                Thread.sleep(20*1000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,123",CharsetUtil.UTF_8));
            }
        });
        System.out.println("ok...");
    }
}

用户自定义定时任务

public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        //将任务提交到scheduleTaskQueue中
        ctx.channel().eventLoop().schedule(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                //此处有一个耗时任务
                Thread.sleep(5*1000);
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello,dqn",CharsetUtil.UTF_8));
            }
        },5, TimeUnit.SECONDS);//延迟5秒执行

        System.out.println("ok...");
    }
}

非当前reactor线程调用channel的各种方法

例如在推送系统的业务线程里,根据用户的标识,找到对应的channel引用,然后调用write方法向该用户推送消息,就会进入到这种场景,最终的write会提交到任务队列中最后被异步消费