RSocket
工作模式
模式 | 数据方向 | 背压支持 | 典型场景 |
---|---|---|---|
Request-Response | 1请求 → 1响应 | ✔️ | 替代 REST API |
Fire-and-Forget | 1请求 → 无响应 | ❌ | 非关键性操作(如日志) |
Request-Stream | 1请求 → N响应流 | ✔️ | 实时数据推送(如股票行情) |
Channel | M请求流 ↔ N响应流 | ✔️ | 全双工交互(如聊天、游戏) |
优势
- 多模式适配:覆盖从简单查询到实时流的所有场景。
- 背压原生支持:避免消费者被压垮(基于 Reactive Streams)。
- 协议无关性:可运行在 TCP、WebSocket、HTTP/2 等传输层上。
- 高性能:二进制编码、多路复用减少连接开销。
原生开发
导入依赖
implementation 'org.springframework.boot:spring-boot-starter-rsocket'
处理器
@Slf4j
public class MessageRSocketHandler implements RSocket {
/**
* 无响应,常用于如日志等
* @param payload Request payload. 附加信息
* @return
*/
@Override
public Mono<Void> fireAndForget(Payload payload) {
String message = payload.getDataUtf8();
log.info("fireAndForget message={}", message);
return Mono.empty();//返回以供空消息
}
/**
* 传统模式
* @param payload Request payload.
* @return
*/
@Override
public Mono<Payload> requestResponse(Payload payload) {
String message = payload.getDataUtf8();
log.info("requestResponse message={}", message);
return Mono.just(DefaultPayload.create("[echo]"+message));
}
/**
* 返回流数据
* @param payload Request payload.
* @return
*/
@Override
public Flux<Payload> requestStream(Payload payload) {
String message = payload.getDataUtf8();
log.info("requestStream message={}", message);
return Flux
.fromStream(message.chars()
.mapToObj(c->Character.toUpperCase((char) c)))
.map(Object::toString)
.map(DefaultPayload::create);
}
/**
* 双向流
* @param payloads Stream of request payloads.
* @return
*/
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads)
.map(Payload::getDataUtf8)
.map(msg->{
log.info("requestStream message={}",msg);
return msg;
})
.map(DefaultPayload::create);
}
}
连接器
public class MessageRSocketAcceptor implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return Mono.just(new MessageRSocketHandler());//配置处理类
}
}
配置服务
public class MessageServer {
//用于释放任务
private static Disposable disposable;
public static void start(){
RSocketServer rSocketServer = RSocketServer.create();
rSocketServer.acceptor(new MessageRSocketAcceptor());
//采用零拷贝
rSocketServer.payloadDecoder(PayloadDecoder.ZERO_COPY);
disposable = rSocketServer
.bind(TcpServerTransport.create(6565))//使用6565端口
.subscribe();
}
public static void stop(){
//释放
disposable.dispose();
}
}
测试
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class MessagesTests {
private static RSocket rsocket;
@BeforeAll
public static void setUpClient(){
//服务启动
MessageServer.start();
//客户端进行链接
rsocket = RSocketConnector
.connectWith(TcpClientTransport.create(6565))
.block();
}
//测试
@Test
void testFireAndForget() {
getRequestPayload()
.flatMap(payload -> rsocket.fireAndForget(payload))
.blockLast(Duration.ofMinutes(1));
}
@Test
void testRequestAndResponse() {
getRequestPayload()
.flatMap(payload -> rsocket.requestResponse(payload))
.doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
@Test
void testRequestStream(){
getRequestPayload()
.flatMap(payload -> rsocket.requestStream(payload))
.doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
@Test
void testRequestChannel(){
rsocket.requestChannel(getRequestPayload())
.doOnNext(resp -> System.out.println("接收响应:"+resp.getDataUtf8()))
.blockLast(Duration.ofMinutes(1));
}
//请求数据
private static Flux<Payload> getRequestPayload() {
return Flux.just("java","spring","rsocket")
.delayElements(Duration.ofSeconds(1))
.map(DefaultPayload::create);
}
@AfterAll
public static void stopServer(){
MessageServer.stop();
}
}
整合SpringBoot
定义一个数据传输的实体类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Message {
private String title;
private String content;
}
服务端
业务类
@Service
public class MessageService {
public List<Message> list(){
return List.of(
new Message("java","java hhh"),
new Message("spring","spring xxxx"),
new Message("rsocket","rsocket aaa")
);
}
public Message get(String title){
return new Message(title,title+"xixi");
}
public Message echo(Message message){
message.setTitle("[echo]"+message.getTitle());
message.setContent("[echo]"+message.getContent());
return message;
}
}
controller
@Controller
@Slf4j
public class MessageController {
@Autowired
private MessageService messageService;
@MessageMapping("message.echo")
public Mono<Message> echoMessage(Mono<Message> message) {
return message
.doOnNext(m->messageService.echo(m))// 响应处理
.doOnNext(msg -> log.info("接收消息:{}",msg));
}
@MessageMapping("message.delete")
public void deleteMessage(Mono<String> title) {
title.doOnNext(msg->log.info("消息删除:{}",msg)).subscribe();
}
@MessageMapping("message.list")
public Flux<Message> listMessages() {
return Flux.fromStream(messageService.list().stream());
}
@MessageMapping("message.get")
public Flux<Message> getMessages(Flux<String> title) {
return title.doOnNext(t->log.info("消息查询:{}",t))
.map(String::toUpperCase)
.map(messageService::get)
.delayElements(Duration.ofSeconds(1));
}
}
配置文件
spring:
rsocket:
server:
port: 6869 #rsocket端口
客户端
配置类
@Configuration
public class RSocketConfig {
//注册
@Bean
public RSocketStrategies rSocketStrategies(){
return RSocketStrategies.builder()
//编码器
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
//解码器
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
}
@Bean
public Mono<RSocketRequester> rSocketRequester(RSocketRequester.Builder builder){
return Mono.just(builder.rsocketConnector(connector ->
connector.reconnect(Retry.fixedDelay(2, Duration.ofSeconds(2))))//失败处理
.dataMimeType(MediaType.APPLICATION_CBOR)//数据的传输类型
.transport(TcpClientTransport.create(6869)));//设置链接端口
}
}
测试
@SpringBootTest
public class SocketTests {
@Autowired
private Mono<RSocketRequester> requester;
@Test
public void testEcho(){
requester.map(r->r.route("message.echo")//地址
.data(new Message("java","java haha")))//请求数据
.flatMap(r->r.retrieveMono(Message.class))
.doOnNext(System.out::println).block();
}
@Test
public void testDelete(){
requester.map(r->r.route("message.delete")
.data("rsocket..."))
.flatMap(RSocketRequester.RetrieveSpec::send)
.block();
}
@Test
public void testList(){
requester.map(r->r.route("message.list"))
.flatMapMany(r->r.retrieveFlux(Message.class))
.doOnNext(System.out::println).blockLast();
}
@Test
public void testGet(){
Flux<String> title = Flux.just("java","spring","rsocket");
requester.map(r->r.route("message.get")
.data(title))
.flatMapMany(r->r.retrieveFlux(Message.class))
.doOnNext(System.out::println).blockLast();
}
}
文件上传
配置类
@Configuration
public class RSocketConfig {
//策略
@Bean
public RSocketStrategies rSocketStrategies(){
return RSocketStrategies.builder()
//编码器
.encoders(e->e.add(new Jackson2CborEncoder()))
//解码器
.decoders(e->e.add(new Jackson2CborDecoder()))
//元数据注册
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(
MimeType.valueOf("message/x.upload.file.name"),
String.class,
"file.name"
);
registry.metadataToExtract(
MimeType.valueOf("message/x.upload.file.extension"),
String.class,
"file.ext"
);
})
.build();
}
}
controller
@Controller
@Slf4j
public class UploadController {
@Value("upload")
private Path path;
@SneakyThrows
@MessageMapping("message.upload")
public Flux<String> upload(
@Headers Map<String,Object> headers, //元数据
@Payload Flux<DataBuffer> payload
){
log.info("文件上传 path:{}",path);
//获取文件名称
var fileName = headers.get("file.name");
//获取文件后缀
var fileExt = headers.get("file.ext");
var filePath = Paths.get(fileName+"."+fileExt);
log.info("filePath:{}",filePath);
//异步文件通道
AsynchronousFileChannel channel = AsynchronousFileChannel.open(
path.resolve(filePath),//解析文件路径
StandardOpenOption.CREATE,//文件创建
StandardOpenOption.WRITE//文件写入
);
return Flux.concat(DataBufferUtils.write(payload,channel)
.map(s->"处理中"), Mono.just("处理成功"))
.doOnComplete(()-> {
try {
//完成后关闭通道
channel.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}).onErrorReturn("失败了");//失败处理
}
}
客户端测试
@SpringBootTest
public class SocketTests {
@Autowired
private Mono<RSocketRequester> requester;
@Value("classpath:t1.png")
private Resource resource;
@Test
public void testUpload(){
Flux<DataBuffer> flux = DataBufferUtils.read(
resource,
new DefaultDataBufferFactory(),
1024
).doOnNext(s-> System.out.println("文件上传"+s));
Flux<String> uploadFlux = requester
.map(r->r.route("message.upload")
.metadata(metadataSpec -> {
System.out.println("上传文件名");
//设置文件名称
metadataSpec.metadata("t1", MimeType.valueOf("message/x.upload.file.name"));
//设置文件后缀
metadataSpec.metadata("png",MimeType.valueOf("message/x.upload.file.extension"));
}).data(flux))//文件上传数据
.flatMapMany(r->r.retrieveFlux(String.class))
.doOnNext(s -> System.out.println("上传进度"+s));
uploadFlux.blockLast();//阻塞等待
}
}
使用WebSocket
spring:
rsocket:
server:
port: 6969 #监听端口
transport: websocket #处理协议
mapping-path: /ws #映射路径
rsocket支持tcp和websocket两种协议,默认tcp