Skip to content

Support @RSocketExchange for responding #30936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 60 additions & 2 deletions framework-docs/modules/ROOT/pages/rsocket.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ The `spring-messaging` module contains the following:

* xref:rsocket.adoc#rsocket-requester[RSocketRequester] -- fluent API to make requests through an `io.rsocket.RSocket`
with data and metadata encoding/decoding.
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping` annotated handler methods for
responding.
* xref:rsocket.adoc#rsocket-interface[RSocket Interfaces] -- `@RSocketExchange` annotated
interfaces for making requests.
* xref:rsocket.adoc#rsocket-annot-responders[Annotated Responders] -- `@MessageMapping`
and `@RSocketExchange` annotated handler methods for responding.

The `spring-web` module contains `Encoder` and `Decoder` implementations such as Jackson
CBOR/JSON, and Protobuf that RSocket applications will likely need. It also contains the
Expand Down Expand Up @@ -862,6 +864,59 @@ interaction type(s):
|===


[[rsocket-annot-rsocketexchange]]
=== @RSocketExchange

While `@MessageMapping` is only supported for responding, `@RSocketExchange`
can be used both to create an annotated responder
and xref:rsocket.adoc#rsocket-interface[an RSocket Interface] that allows
making requests.

`@RSocketExchange` can be used as follows to create responder methods:

[tabs]
======
Java::
+
[source,java,indent=0,subs="verbatim,quotes",role="primary"]
----
@Controller
public class RadarsController {

@RSocketExchange("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
----

Kotlin::
+
[source,kotlin,indent=0,subs="verbatim,quotes",role="secondary"]
----
@Controller
class RadarsController {

@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
----
======

`@RSocketExhange` supports a very similar method signature to `@MessageMapping`,
however, since it needs to be suitable both for requester and responder use,
there are slight differences. Notably, while `@MessageMapping` accepts
a `String` array as its `value` parameter, only a single `String` can be passed
as the `value` of `@RSocketExchange`.

When it comes to possible return values and the way we establish supported
RSocket interaction types, it works in the same way as with `@MessageMapping`.

Similarly to `@MessageMapping`, `@RSocketExchange` can also be used at class
level to specify a common prefix for all the method routes within the class.


[[rsocket-annot-connectmapping]]
=== @ConnectMapping
Expand Down Expand Up @@ -1026,6 +1081,9 @@ Two, create a proxy that will perform the declared RSocket exchanges:
RepositoryService service = factory.createClient(RadarService.class);
----

NOTE: Apart from RSocket interface services, `@RSocketExchange` can also
be used to create xref:rsocket.adoc#rsocket-annot-rsocketexchange[annotated responders].


[[rsocket-interface-method-parameters]]
=== Method Parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.messaging.rsocket.service.RSocketExchange;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
Expand All @@ -60,8 +61,9 @@

/**
* Extension of {@link MessageMappingMessageHandler} for handling RSocket
* requests with {@link ConnectMapping @ConnectMapping} and
* {@link MessageMapping @MessageMapping} methods.
* requests with {@link ConnectMapping @ConnectMapping},
* {@link MessageMapping @MessageMapping}
* and {@link RSocketExchange @RSocketExchange} methods.
*
* <p>For server scenarios this class can be declared as a bean in Spring
* configuration and that would detect {@code @MessageMapping} methods in
Expand All @@ -77,13 +79,14 @@
* {@link org.springframework.messaging.rsocket.RSocketRequester.Builder#rsocketConnector
* RSocketRequester.Builder}.
*
* <p>For {@code @MessageMapping} methods, this class automatically determines
* the RSocket interaction type based on the input and output cardinality of the
* method. See the
* <p>For {@code @MessageMapping} and {@code @RSocketExchange} methods,
* this class automatically determines the RSocket interaction type
* based on the input and output cardinality of the method. See the
* <a href="https://docs.spring.io/spring/docs/current/spring-framework-reference/web-reactive.html#rsocket-annot-responders">
* "Annotated Responders"</a> section of the Spring Framework reference for more details.
*
* @author Rossen Stoyanchev
* @author Olga Maciaszek-Sharma
* @since 5.2
*/
public class RSocketMessageHandler extends MessageMappingMessageHandler {
Expand Down Expand Up @@ -322,6 +325,15 @@ protected CompositeMessageCondition getCondition(AnnotatedElement element) {
RSocketFrameTypeMessageCondition.CONNECT_CONDITION,
new DestinationPatternsMessageCondition(patterns, obtainRouteMatcher()));
}
RSocketExchange ann3 = AnnotatedElementUtils.findMergedAnnotation(element, RSocketExchange.class);
if (ann3 != null && StringUtils.hasText(ann3.value())) {
String[] destinations = new String[]{ann3.value()};
return new CompositeMessageCondition(
RSocketFrameTypeMessageCondition.EMPTY_CONDITION,
new DestinationPatternsMessageCondition(processDestinations(destinations),
obtainRouteMatcher())
);
}
return null;
}

Expand Down Expand Up @@ -402,7 +414,8 @@ protected void handleNoMatch(@Nullable RouteMatcher.Route destination, Message<?
* connection. Such a method can also start requests to the client but that
* must be done decoupled from handling and from the current thread.
* <p>Subsequent requests on the connection can be handled with
* {@link MessageMapping MessageMapping} methods.
* {@link MessageMapping MessageMapping}
* and {@link RSocketExchange RSocketExchange} methods.
*/
public SocketAcceptor responder() {
return (setupPayload, sendingRSocket) -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,7 +33,6 @@
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
Expand All @@ -45,6 +44,7 @@
* Integration tests with RSocket Service client.
*
* @author Rossen Stoyanchev
* @author Olga Maciaszek-Sharma
*/
class RSocketServiceIntegrationTests {

Expand All @@ -57,7 +57,7 @@ class RSocketServiceIntegrationTests {

@BeforeAll
@SuppressWarnings("ConstantConditions")
static void setupOnce() throws Exception {
static void setupOnce() {

MimeType metadataMimeType = MimeTypeUtils.parseMimeType(
WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString());
Expand Down Expand Up @@ -112,28 +112,26 @@ void echoStream() {
}


@Controller
@RSocketExchange("echo")
interface Service {

@RSocketExchange("echo-async")
@RSocketExchange("async")
Mono<String> echoAsync(String payload);

@RSocketExchange("echo-stream")
@RSocketExchange("stream")
Flux<String> echoStream(String payload);

}


@Controller
static class ServerController {
static class ServerController implements Service {

@MessageMapping("echo-async")
Mono<String> echoAsync(String payload) {
public Mono<String> echoAsync(String payload) {
return Mono.delay(Duration.ofMillis(10)).map(aLong -> payload + " async");
}

@MessageMapping("echo-stream")
Flux<String> echoStream(String payload) {
public Flux<String> echoStream(String payload) {
return Flux.interval(Duration.ofMillis(10)).map(aLong -> payload + " " + aLong);
}

Expand Down