Skip to content

Adding Spring security to WebFlux flow fails with blocking errors #2464

Closed
@ekazakas

Description

@ekazakas

Hello, I have a very simple flow:

@Configuration
public class LiveMessagesFlow {
	@Bean
	public IntegrationFlow metricGamingInboundFlow() {
		return IntegrationFlows
				.from(
						WebFlux.inboundGateway("/api/live/message")
								.requestMapping(mapping -> mapping
										.consumes(MediaType.APPLICATION_JSON_VALUE)
										.produces(MediaType.APPLICATION_JSON_VALUE)
								)
				)
				.log()
                                .handle((p, h) -> Mono.just("{\"status\":\"OK\"}"))
				.get();
	}
}

Without adding Spring Security, this works properly, but after adding the following:

@EnableWebFluxSecurity
@Configuration
public class SecurityConfiguration {
	private static final String ROLE_USER = "USER";

	@Bean
	public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
		http.csrf()
				.disable()
				.authorizeExchange()
				.matchers(EndpointRequest.toAnyEndpoint())
				.permitAll()
				.anyExchange()
				.hasRole(ROLE_USER)
				.and()
				.httpBasic();

		return http.build();
	}

	@Bean
	public ReactiveUserDetailsService userDetailsService(SecurityProperties properties) {
		return new MapReactiveUserDetailsService(Arrays.asList(buildUserDetails(properties.getUser())));
	}

	private UserDetails buildUserDetails(SecurityProperties.User user) {
		return User.withUsername(user.getName())
				.password(user.getPassword())
				.roles(user.getRoles().stream().toArray(String[]::new))
				.build();
	}
}

I get the following exception when trying to call the endpoint:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-server-epoll-7
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.Mono.block(Mono.java:1175) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.buildMessage(WebFluxInboundEndpoint.java:310) ~[spring-integration-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.lambda$doHandle$3(WebFluxInboundEndpoint.java:155) ~[spring-integration-webflux-5.0.5.RELEASE.jar:5.0.5.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:108) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:198) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:115) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onNext(FluxFilterFuseable.java:104) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1080) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onComplete(MonoCollectList.java:117) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:245) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:130) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:377) ~[reactor-netty-0.7.7.RELEASE.jar:0.7.7.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:202) ~[reactor-netty-0.7.7.RELEASE.jar:0.7.7.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.request(FluxReceive.java:110) ~[reactor-netty-0.7.7.RELEASE.jar:0.7.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:149) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:130) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:149) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.MonoCollectList$MonoBufferAllSubscriber.onSubscribe(MonoCollectList.java:90) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:163) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86) [reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:273) ~[reactor-netty-0.7.7.RELEASE.jar:0.7.7.RELEASE]
	at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.7.RELEASE.jar:0.7.7.RELEASE]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.24.Final.jar:4.1.24.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.24.Final.jar:4.1.24.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.24.Final.jar:4.1.24.Final]
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:309) ~[netty-transport-native-epoll-4.1.24.Final-linux-x86_64.jar:4.1.24.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.24.Final.jar:4.1.24.Final]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_171]

Spring dependencies that I currently use:

org.springframework.boot:spring-boot-starter-integration:2.0.2.RELEASE
org.springframework.boot:spring-boot-starter-webflux:2.0.2.RELEASE
org.springframework.boot:spring-boot-starter-actuator:2.0.2.RELEASE
org.springframework.boot:spring-boot-starter-security:2.0.2.RELEASE
org.springframework.boot:spring-boot-starter-amqp:2.0.2.RELEASE
org.springframework.integration:spring-integration-webflux:5.0.5.RELEASE
org.springframework.integration:spring-integration-amqp:5.0.5.RELEASE

It seems there is an issue with the following line in WebFluxInboundEndpoint:

....setHeader(org.springframework.integration.http.HttpHeaders.USER_PRINCIPAL, exchange.getPrincipal().block())

Can someon provide some insight on this?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions