RSocket

RSocket

起男 21 2025-06-06

RSocket

工作模式

模式 数据方向 背压支持 典型场景
Request-Response 1请求 → 1响应 ✔️ 替代 REST API
Fire-and-Forget 1请求 → 无响应 非关键性操作(如日志)
Request-Stream 1请求 → N响应流 ✔️ 实时数据推送(如股票行情)
Channel M请求流 ↔ N响应流 ✔️ 全双工交互(如聊天、游戏)

优势

  1. 多模式适配:覆盖从简单查询到实时流的所有场景。
  2. 背压原生支持:避免消费者被压垮(基于 Reactive Streams)。
  3. 协议无关性:可运行在 TCP、WebSocket、HTTP/2 等传输层上。
  4. 高性能:二进制编码、多路复用减少连接开销。

原生开发

导入依赖

implementation 'org.springframework.boot:spring-boot-starter-rsocket'

处理器

@Slf4j
public class MessageRSocketHandler implements RSocket {

    /**
     * 无响应,常用于如日志等
     * @param payload Request payload. 附加信息
     * @return
     */
    @Override
    public Mono<Void> fireAndForget(Payload payload) {
        String message = payload.getDataUtf8();
        log.info("fireAndForget message={}", message);
        return Mono.empty();//返回以供空消息
    }

    /**
     * 传统模式
     * @param payload Request payload.
     * @return
     */
    @Override
    public Mono<Payload> requestResponse(Payload payload) {
        String message = payload.getDataUtf8();
        log.info("requestResponse message={}", message);
        return Mono.just(DefaultPayload.create("[echo]"+message));
    }

    /**
     * 返回流数据
     * @param payload Request payload.
     * @return
     */
    @Override
    public Flux<Payload> requestStream(Payload payload) {
        String message = payload.getDataUtf8();
        log.info("requestStream message={}", message);
        return Flux
                .fromStream(message.chars()
                        .mapToObj(c->Character.toUpperCase((char) c)))
                .map(Object::toString)
                .map(DefaultPayload::create);
    }

    /**
     * 双向流
     * @param payloads Stream of request payloads.
     * @return
     */
    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        return Flux.from(payloads)
                .map(Payload::getDataUtf8)
                .map(msg->{
                    log.info("requestStream message={}",msg);
                    return msg;
                })
                .map(DefaultPayload::create);
    }
}

连接器

public class MessageRSocketAcceptor implements SocketAcceptor {
    @Override
    public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
        return Mono.just(new MessageRSocketHandler());//配置处理类
    }
}

配置服务

public class MessageServer {

    //用于释放任务
    private static Disposable disposable;

    public static void start(){
        RSocketServer rSocketServer = RSocketServer.create();
        rSocketServer.acceptor(new MessageRSocketAcceptor());
        //采用零拷贝
        rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
        disposable = rSocketServer
                .bind(TcpServerTransport.create(6565))//使用6565端口
                .subscribe();
    }

    public static void stop(){
        //释放
        disposable.dispose();
    }
}

测试

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class MessagesTests {

    private static RSocket rsocket;

    @BeforeAll
    public static void setUpClient(){
        //服务启动
        MessageServer.start();
        //客户端进行链接
        rsocket = RSocketConnector
                .connectWith(TcpClientTransport.create(6565))
                .block();
    }

    //测试
    @Test
    void testFireAndForget() {
        getRequestPayload()
                .flatMap(payload -> rsocket.fireAndForget(payload))
                .blockLast(Duration.ofMinutes(1));
    }
    @Test
    void testRequestAndResponse() {
        getRequestPayload()
                .flatMap(payload -> rsocket.requestResponse(payload))
                .doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
                .blockLast(Duration.ofMinutes(1));
    }
    @Test
    void testRequestStream(){
        getRequestPayload()
                .flatMap(payload -> rsocket.requestStream(payload))
                .doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
                .blockLast(Duration.ofMinutes(1));
    }
    @Test
    void testRequestChannel(){
        rsocket.requestChannel(getRequestPayload())
                        .doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
                                .blockLast(Duration.ofMinutes(1));
    }
    //请求数据
    private static Flux<Payload> getRequestPayload() {
        return Flux.just("java","spring","rsocket")
                .delayElements(Duration.ofSeconds(1))
                .map(DefaultPayload::create);
    }

    @AfterAll
    public static void stopServer(){
        MessageServer.stop();
    }
}

整合SpringBoot

定义一个数据传输的实体类

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
    private String title;
    private String content;
}

服务端

业务类

@Service
public class MessageService {

    public List<Message> list(){
        return List.of(
                new Message("java","java hhh"),
                new Message("spring","spring xxxx"),
                new Message("rsocket","rsocket aaa")
        );
    }
    public Message get(String title){
        return new Message(title,title+"xixi");
    }
    public Message echo(Message message){
        message.setTitle("[echo]"+message.getTitle());
        message.setContent("[echo]"+message.getContent());
        return message;
    }
}

controller

@Controller
@Slf4j
public class MessageController {
    @Autowired
    private MessageService messageService;

    @MessageMapping("message.echo")
    public Mono<Message> echoMessage(Mono<Message> message) {
        return message
                .doOnNext(m->messageService.echo(m))// 响应处理
                .doOnNext(msg -> log.info("接收消息:{}",msg));
    }
    @MessageMapping("message.delete")
    public void deleteMessage(Mono<String> title) {
        title.doOnNext(msg->log.info("消息删除:{}",msg)).subscribe();
    }

    @MessageMapping("message.list")
    public Flux<Message> listMessages() {
        return Flux.fromStream(messageService.list().stream());
    }
    @MessageMapping("message.get")
    public Flux<Message> getMessages(Flux<String> title) {
        return title.doOnNext(t->log.info("消息查询:{}",t))
                .map(String::toUpperCase)
                .map(messageService::get)
                .delayElements(Duration.ofSeconds(1));
    }
}

配置文件

spring:
  rsocket:
    server:
      port: 6869 #rsocket端口

客户端

配置类

@Configuration
public class RSocketConfig {

    //注册
    @Bean
    public RSocketStrategies rSocketStrategies(){
        return RSocketStrategies.builder()
                //编码器
                .encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
                //解码器
                .decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
                .build();
    }
    @Bean
    public Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder){
        return Mono.just(builder.rsocketConnector(connector ->
                connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))//失败处理
                .dataMimeType(MediaType.APPLICATION_CBOR)//数据的传输类型
                .transport(TcpClientTransport.create(6869)));//设置链接端口

    }
}

测试

@SpringBootTest
public class SocketTests {

    @Autowired
    private Mono<RSocketRequester> requester;

    @Test
    public void testEcho(){
        requester.map(r->r.route("message.echo")//地址
                .data(new Message("java","java haha")))//请求数据
                .flatMap(r->r.retrieveMono(Message.class))
                .doOnNext(System.out::println).block();
    }

    @Test
    public void testDelete(){
        requester.map(r->r.route("message.delete")
                        .data("rsocket..."))
                .flatMap(RSocketRequester.RetrieveSpec::send)
                .block();
    }
    @Test
    public void testList(){
        requester.map(r->r.route("message.list"))
                .flatMapMany(r->r.retrieveFlux(Message.class))
                .doOnNext(System.out::println).blockLast();
    }

    @Test
    public void testGet(){
        Flux<String> title = Flux.just("java","spring","rsocket");
        requester.map(r->r.route("message.get")
                        .data(title))
                .flatMapMany(r->r.retrieveFlux(Message.class))
                .doOnNext(System.out::println).blockLast();
    }
}

文件上传

配置类

@Configuration
public class RSocketConfig {

    //策略
    @Bean
    public RSocketStrategies rSocketStrategies(){
        return RSocketStrategies.builder()
                //编码器
                .encoders(e->e.add(new Jackson2CborEncoder()))
                //解码器
                .decoders(e->e.add(new Jackson2CborDecoder()))
                //元数据注册
                .metadataExtractorRegistry(registry -> {
                    registry.metadataToExtract(
                            MimeType.valueOf("message/x.upload.file.name"),
                            String.class,
                            "file.name"
                    );
                    registry.metadataToExtract(
                            MimeType.valueOf("message/x.upload.file.extension"),
                            String.class,
                            "file.ext"
                    );
                })
                .build();
    }
}

controller

@Controller
@Slf4j
public class UploadController {

    @Value("upload")
    private Path path;

    @SneakyThrows
    @MessageMapping("message.upload")
    public Flux<String> upload(
            @Headers Map<String,Object> headers, //元数据
            @Payload Flux<DataBuffer> payload
            ){
        log.info("文件上传 path:{}",path);
        //获取文件名称
        var fileName = headers.get("file.name");
        //获取文件后缀
        var fileExt = headers.get("file.ext");
        var filePath = Paths.get(fileName+"."+fileExt);
        log.info("filePath:{}",filePath);
        //异步文件通道
        AsynchronousFileChannel channel = AsynchronousFileChannel.open(
                path.resolve(filePath),//解析文件路径
                StandardOpenOption.CREATE,//文件创建
                StandardOpenOption.WRITE//文件写入
        );
        return Flux.concat(DataBufferUtils.write(payload,channel)
                .map(s->"处理中"), Mono.just("处理成功"))
                .doOnComplete(()-> {
                    try {
                        //完成后关闭通道
                        channel.close();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }).onErrorReturn("失败了");//失败处理
    }
}

客户端测试

@SpringBootTest
public class SocketTests {

    @Autowired
    private Mono<RSocketRequester> requester;

    @Value("classpath:t1.png")
    private Resource resource;

    @Test
    public void testUpload(){
        Flux<DataBuffer> flux = DataBufferUtils.read(
                resource,
                new DefaultDataBufferFactory(),
                1024
        ).doOnNext(s-> System.out.println("文件上传"+s));
        Flux<String> uploadFlux = requester
                .map(r->r.route("message.upload")
                        .metadata(metadataSpec -> {
                            System.out.println("上传文件名");
                            //设置文件名称
                            metadataSpec.metadata("t1", MimeType.valueOf("message/x.upload.file.name"));
                            //设置文件后缀
                            metadataSpec.metadata("png",MimeType.valueOf("message/x.upload.file.extension"));
                        }).data(flux))//文件上传数据
                .flatMapMany(r->r.retrieveFlux(String.class))
                .doOnNext(s -> System.out.println("上传进度"+s));
        uploadFlux.blockLast();//阻塞等待

    }
}

使用WebSocket

spring:
  rsocket:
    server:
      port: 6969 #监听端口
      transport: websocket #处理协议
      mapping-path: /ws #映射路径

rsocket支持tcp和websocket两种协议,默认tcp