Skip to content

Commit 0fc8bec

Browse files
artembilangaryrussell
authored andcommitted
GH-2464: WebFlux: Get rid of Mono.block() (#2465)
* GH-2464: WebFlux: Get rid of Mono.block() Fixes #2464 The `WebFluxInboundEndpoint` resolves a `Principal` via `Mono.block()` operation. This is prohibited situation in the non-blocking thread, like Reactor Netty * Defer `Mono<Principal>` resolution to the message header via deferring the whole `Message` creating via `flatMap()` operation on the main `doHandle()` `Mono` **Cherry-pick to 5.0.x** * * Fix Checkstyle for the Reactive Spring Security testing utils static imports * Add `defaultIfEmpty()`, when `exchange.getPrincipal()` is an empty `Mono`
1 parent 4bb1a61 commit 0fc8bec

File tree

3 files changed

+64
-19
lines changed

3 files changed

+64
-19
lines changed

spring-integration-webflux/src/main/java/org/springframework/integration/webflux/inbound/WebFluxInboundEndpoint.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ private Mono<Void> doHandle(ServerWebExchange exchange) {
152152
.doOnSubscribe(s -> this.activeCount.incrementAndGet())
153153
.switchIfEmpty(Mono.just(exchange.getRequest().getQueryParams()))
154154
.map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
155-
.map(entity -> buildMessage(entity, exchange))
155+
.flatMap(entity -> buildMessage(entity, exchange))
156156
.flatMap(requestMessage -> {
157157
if (this.expectReply) {
158158
return sendAndReceiveMessageReactive(requestMessage)
@@ -235,7 +235,7 @@ else if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
235235
}
236236

237237
@SuppressWarnings("unchecked")
238-
private Message<?> buildMessage(HttpEntity<?> httpEntity, ServerWebExchange exchange) {
238+
private Mono<Message<?>> buildMessage(HttpEntity<?> httpEntity, ServerWebExchange exchange) {
239239
ServerHttpRequest request = exchange.getRequest();
240240
HttpHeaders requestHeaders = request.getHeaders();
241241
Map<String, Object> exchangeAttributes = exchange.getAttributes();
@@ -289,12 +289,12 @@ private Message<?> buildMessage(HttpEntity<?> httpEntity, ServerWebExchange exch
289289
payload = requestParams;
290290
}
291291

292-
AbstractIntegrationMessageBuilder<?> messageBuilder;
292+
AbstractIntegrationMessageBuilder<Object> messageBuilder;
293293

294294
if (payload instanceof Message<?>) {
295295
messageBuilder =
296296
getMessageBuilderFactory()
297-
.fromMessage((Message<?>) payload)
297+
.fromMessage((Message<Object>) payload)
298298
.copyHeadersIfAbsent(headers);
299299
}
300300
else {
@@ -304,11 +304,17 @@ private Message<?> buildMessage(HttpEntity<?> httpEntity, ServerWebExchange exch
304304
.copyHeaders(headers);
305305
}
306306

307-
return messageBuilder
307+
messageBuilder
308308
.setHeader(org.springframework.integration.http.HttpHeaders.REQUEST_URL, request.getURI().toString())
309-
.setHeader(org.springframework.integration.http.HttpHeaders.REQUEST_METHOD, request.getMethod().toString())
310-
.setHeader(org.springframework.integration.http.HttpHeaders.USER_PRINCIPAL, exchange.getPrincipal().block())
311-
.build();
309+
.setHeader(org.springframework.integration.http.HttpHeaders.REQUEST_METHOD,
310+
request.getMethod().toString());
311+
312+
return exchange.getPrincipal()
313+
.map(principal ->
314+
messageBuilder
315+
.setHeader(org.springframework.integration.http.HttpHeaders.USER_PRINCIPAL, principal))
316+
.defaultIfEmpty(messageBuilder)
317+
.map(AbstractIntegrationMessageBuilder::build);
312318
}
313319

314320
private Mono<Void> populateResponse(ServerWebExchange exchange, Message<?> replyMessage) {

spring-integration-webflux/src/test/java/org/springframework/integration/webflux/dsl/WebFluxDslTests.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
package org.springframework.integration.webflux.dsl;
1818

1919
import static org.hamcrest.Matchers.instanceOf;
20+
import static org.hamcrest.Matchers.is;
2021
import static org.junit.Assert.assertNotNull;
2122
import static org.junit.Assert.assertThat;
2223
import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
23-
import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
2424
import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get;
2525
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content;
26+
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.Credentials.basicAuthenticationCredentials;
27+
import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;
2628

29+
import java.security.Principal;
2730
import java.util.Collections;
2831

2932
import javax.annotation.Resource;
@@ -48,6 +51,7 @@
4851
import org.springframework.integration.config.EnableIntegration;
4952
import org.springframework.integration.dsl.IntegrationFlow;
5053
import org.springframework.integration.dsl.IntegrationFlows;
54+
import org.springframework.integration.http.HttpHeaders;
5155
import org.springframework.integration.http.dsl.Http;
5256
import org.springframework.integration.support.MessageBuilder;
5357
import org.springframework.integration.webflux.outbound.WebFluxRequestExecutingMessageHandler;
@@ -60,10 +64,18 @@
6064
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
6165
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
6266
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
67+
import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
68+
import org.springframework.security.config.web.server.ServerHttpSecurity;
69+
import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
70+
import org.springframework.security.core.userdetails.ReactiveUserDetailsService;
6371
import org.springframework.security.core.userdetails.User;
72+
import org.springframework.security.core.userdetails.UserDetails;
6473
import org.springframework.security.core.userdetails.UserDetailsService;
6574
import org.springframework.security.crypto.factory.PasswordEncoderFactories;
6675
import org.springframework.security.provisioning.InMemoryUserDetailsManager;
76+
import org.springframework.security.test.web.reactive.server.SecurityMockServerConfigurers;
77+
import org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers;
78+
import org.springframework.security.web.server.SecurityWebFilterChain;
6779
import org.springframework.test.annotation.DirtiesContext;
6880
import org.springframework.test.context.junit4.SpringRunner;
6981
import org.springframework.test.context.web.WebAppConfiguration;
@@ -116,11 +128,14 @@ public class WebFluxDslTests {
116128
public void setup() {
117129
this.mockMvc =
118130
MockMvcBuilders.webAppContextSetup(this.wac)
119-
.apply(springSecurity())
131+
.apply(SecurityMockMvcConfigurers.springSecurity())
120132
.build();
121133

122134
this.webTestClient =
123135
WebTestClient.bindToApplicationContext(this.wac)
136+
.apply(SecurityMockServerConfigurers.springSecurity())
137+
.configureClient()
138+
.filter(basicAuthentication())
124139
.build();
125140
}
126141

@@ -198,6 +213,7 @@ public void testHttpReactiveProxyFlow() throws Exception {
198213
@SuppressWarnings("unchecked")
199214
public void testHttpReactivePost() {
200215
this.webTestClient.post().uri("/reactivePost")
216+
.attributes(basicAuthenticationCredentials("guest", "guest"))
201217
.body(Mono.just("foo\nbar\nbaz"), String.class)
202218
.exchange()
203219
.expectStatus().isAccepted();
@@ -206,6 +222,8 @@ public void testHttpReactivePost() {
206222
assertNotNull(store);
207223
assertThat(store.getPayload(), instanceOf(Flux.class));
208224

225+
assertThat(store.getHeaders().get(HttpHeaders.USER_PRINCIPAL, Principal.class).getName(), is("guest"));
226+
209227
StepVerifier
210228
.create((Publisher<String>) store.getPayload())
211229
.expectNext("foo", "bar", "baz")
@@ -217,6 +235,7 @@ public void testHttpReactivePost() {
217235
public void testSse() {
218236
Flux<String> responseBody =
219237
this.webTestClient.get().uri("/sse")
238+
.attributes(basicAuthenticationCredentials("guest", "guest"))
220239
.exchange()
221240
.returnResult(String.class)
222241
.getResponseBody();
@@ -230,22 +249,40 @@ public void testSse() {
230249
@Configuration
231250
@EnableWebFlux
232251
@EnableWebSecurity
252+
@EnableWebFluxSecurity
233253
@EnableIntegration
234254
public static class ContextConfiguration extends WebSecurityConfigurerAdapter {
235255

256+
@Bean
257+
public UserDetails userDetails() {
258+
return User.withUsername("guest")
259+
.passwordEncoder(PasswordEncoderFactories.createDelegatingPasswordEncoder()::encode)
260+
.password("guest")
261+
.roles("ADMIN")
262+
.build();
263+
}
264+
236265
@Override
237266
@Bean
238267
public UserDetailsService userDetailsService() {
239-
InMemoryUserDetailsManager manager = new InMemoryUserDetailsManager();
268+
return new InMemoryUserDetailsManager(userDetails());
269+
}
240270

241-
manager.createUser(
242-
User.withUsername("guest")
243-
.passwordEncoder(PasswordEncoderFactories.createDelegatingPasswordEncoder()::encode)
244-
.password("guest")
245-
.roles("ADMIN")
246-
.build());
271+
@Bean
272+
public ReactiveUserDetailsService reactiveUserDetailsService() {
273+
return new MapReactiveUserDetailsService(userDetails());
274+
}
247275

248-
return manager;
276+
277+
@Bean
278+
public SecurityWebFilterChain reactiveSpringSecurityFilterChain(ServerHttpSecurity http) {
279+
return http.authorizeExchange()
280+
.anyExchange().hasRole("ADMIN")
281+
.and()
282+
.httpBasic()
283+
.and()
284+
.csrf().disable()
285+
.build();
249286
}
250287

251288
@Override

src/checkstyle/checkstyle.xml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@
104104
org.springframework.test.web.servlet.request.MockMvcRequestBuilders.*,
105105
org.springframework.test.web.servlet.result.MockMvcResultMatchers.*,
106106
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.*,
107-
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.*" />
107+
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.*,
108+
org.springframework.web.reactive.function.client.ExchangeFilterFunctions.*,
109+
org.springframework.web.reactive.function.client.ExchangeFilterFunctions.Credentials.*" />
108110
</module>
109111
<module name="IllegalImport" />
110112
<module name="RedundantImport" />

0 commit comments

Comments
 (0)