Skip to content

Commit 3ca345b

Browse files
committed
DefaultReactiveElasticsearchClient handle 5xx error with empty body
Original Pull Request #1713 Closes #1712 (cherry picked from commit 6634d00) Test adapted
1 parent 82d9cb4 commit 3ca345b

File tree

2 files changed

+54
-22
lines changed

2 files changed

+54
-22
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,9 @@ private <T> Publisher<? extends T> handleServerError(Request request, ClientResp
809809
String mediaType = response.headers().contentType().map(MediaType::toString).orElse(XContentType.JSON.mediaType());
810810

811811
return response.body(BodyExtractors.toMono(byte[].class)) //
812+
.switchIfEmpty(Mono.error(
813+
new ElasticsearchStatusException(String.format("%s request to %s returned error code %s and no body.",
814+
request.getMethod(), request.getEndpoint(), statusCode), status)))
812815
.map(bytes -> new String(bytes, StandardCharsets.UTF_8)) //
813816
.flatMap(content -> contentOrError(content, mediaType, status))
814817
.flatMap(unused -> Mono
@@ -834,7 +837,7 @@ private <T> Publisher<? extends T> handleClientError(String logId, Request reque
834837
/**
835838
* checks if the given content body contains an {@link ElasticsearchException}, if yes it is returned in a Mono.error.
836839
* Otherwise the content is returned in the Mono
837-
*
840+
*
838841
* @param content the content to analyze
839842
* @param mediaType the returned media type
840843
* @param status the response status
@@ -855,7 +858,7 @@ private static Mono<String> contentOrError(String content, String mediaType, Res
855858

856859
/**
857860
* tries to parse an {@link ElasticsearchException} from the given body content
858-
*
861+
*
859862
* @param content the content to analyse
860863
* @param mediaType the type of the body content
861864
* @return an {@link ElasticsearchException} or {@literal null}.
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,23 +19,31 @@
1919
import static org.elasticsearch.search.internal.SearchContext.*;
2020
import static org.mockito.Mockito.*;
2121

22-
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
2322
import reactor.core.publisher.Mono;
23+
import reactor.test.StepVerifier;
2424

25+
import java.net.URI;
26+
import java.util.Optional;
2527
import java.util.function.Function;
2628

29+
import org.elasticsearch.ElasticsearchStatusException;
30+
import org.elasticsearch.action.get.GetRequest;
2731
import org.elasticsearch.action.search.SearchRequest;
2832
import org.elasticsearch.client.Request;
2933
import org.elasticsearch.index.query.QueryBuilders;
3034
import org.elasticsearch.search.builder.SearchSourceBuilder;
31-
import org.junit.jupiter.api.BeforeEach;
35+
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
36+
import org.junit.jupiter.api.DisplayName;
3237
import org.junit.jupiter.api.Test;
3338
import org.junit.jupiter.api.extension.ExtendWith;
3439
import org.mockito.ArgumentCaptor;
3540
import org.mockito.Mock;
41+
import org.mockito.Spy;
3642
import org.mockito.junit.jupiter.MockitoExtension;
43+
import org.springframework.http.HttpStatus;
3744
import org.springframework.web.reactive.function.client.ClientResponse;
38-
import reactor.test.StepVerifier;
45+
import org.springframework.web.reactive.function.client.WebClient;
46+
import org.springframework.web.util.UriBuilder;
3947

4048
/**
4149
* @author Peter-Josef Meisch
@@ -46,30 +54,24 @@ class DefaultReactiveElasticsearchClientTest {
4654
@Mock private HostProvider hostProvider;
4755

4856
@Mock private Function<SearchRequest, Request> searchRequestConverter;
57+
@Spy private RequestCreator requestCreator;
4958

50-
private DefaultReactiveElasticsearchClient client;
51-
52-
@BeforeEach
53-
void setUp() {
54-
client = new DefaultReactiveElasticsearchClient(hostProvider, new RequestCreator() {
55-
@Override
56-
public Function<SearchRequest, Request> search() {
57-
return searchRequestConverter;
58-
}
59-
}) {
60-
@Override
61-
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) {
62-
return Mono.empty();
63-
}
64-
};
65-
}
59+
@Mock private WebClient webClient;
6660

6761
@Test
6862
void shouldSetAppropriateRequestParametersOnCount() {
6963

64+
when(requestCreator.search()).thenReturn(searchRequestConverter);
7065
SearchRequest searchRequest = new SearchRequest("someindex") //
7166
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
7267

68+
ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator) {
69+
@Override
70+
public Mono<ClientResponse> execute(ReactiveElasticsearchClientCallback callback) {
71+
return Mono.empty();
72+
}
73+
};
74+
7375
client.count(searchRequest).as(StepVerifier::create).verifyComplete();
7476

7577
ArgumentCaptor<SearchRequest> captor = ArgumentCaptor.forClass(SearchRequest.class);
@@ -79,4 +81,31 @@ void shouldSetAppropriateRequestParametersOnCount() {
7981
assertThat(source.trackTotalHitsUpTo()).isEqualTo(TRACK_TOTAL_HITS_ACCURATE);
8082
assertThat(source.fetchSource()).isEqualTo(FetchSourceContext.DO_NOT_FETCH_SOURCE);
8183
}
84+
85+
@Test // #1712
86+
@DisplayName("should throw ElasticsearchStatusException on server 5xx with empty body")
87+
void shouldThrowElasticsearchStatusExceptionOnServer5xxWithEmptyBody() {
88+
89+
when(hostProvider.getActive(any())).thenReturn(Mono.just(webClient));
90+
WebClient.RequestBodyUriSpec requestBodyUriSpec = mock(WebClient.RequestBodyUriSpec.class);
91+
when(requestBodyUriSpec.uri((Function<UriBuilder, URI>) any())).thenReturn(requestBodyUriSpec);
92+
when(requestBodyUriSpec.attribute(any(), any())).thenReturn(requestBodyUriSpec);
93+
when(requestBodyUriSpec.headers(any())).thenReturn(requestBodyUriSpec);
94+
when(webClient.method(any())).thenReturn(requestBodyUriSpec);
95+
96+
ClientResponse clientResponse = mock(ClientResponse.class);
97+
when(clientResponse.statusCode()).thenReturn(HttpStatus.SERVICE_UNAVAILABLE);
98+
ClientResponse.Headers headers = mock(ClientResponse.Headers.class);
99+
when(headers.contentType()).thenReturn(Optional.empty());
100+
when(clientResponse.headers()).thenReturn(headers);
101+
when(clientResponse.body(any())).thenReturn(Mono.empty());
102+
when(requestBodyUriSpec.exchange()).thenReturn(Mono.just(clientResponse));
103+
104+
ReactiveElasticsearchClient client = new DefaultReactiveElasticsearchClient(hostProvider, requestCreator);
105+
106+
client.get(new GetRequest("42")) //
107+
.as(StepVerifier::create) //
108+
.expectError(ElasticsearchStatusException.class) //
109+
.verify(); //
110+
}
82111
}

0 commit comments

Comments
 (0)