Skip to content

Add support for typed-key arrays, refactor and add tests #125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import jakarta.json.stream.JsonParser;
import jakarta.json.stream.JsonParsingException;

import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -57,13 +59,13 @@ public Deserializer(Map<String, JsonpDeserializer<? extends Member>> deserialize
/**
* Deserialize a union value, given its type.
*/
public Union deserialize(String type, JsonParser parser, JsonpMapper mapper) {
public Union deserialize(String type, JsonParser parser, JsonpMapper mapper, Event event) {
JsonpDeserializer<? extends Member> deserializer = deserializers.get(type);
if (deserializer == null) {
throw new JsonParsingException("Unknown variant type '" + type + "'", parser.getLocation());
}

return unionCtor.apply(type, deserializer.deserialize(parser, mapper));
return unionCtor.apply(type, deserializer.deserialize(parser, mapper, event));
}

/**
Expand Down Expand Up @@ -104,10 +106,44 @@ public void deserializeEntry(String key, JsonParser parser, JsonpMapper mapper,
String type = key.substring(0, hashPos);
String name = key.substring(hashPos + 1);

targetMap.put(name, deserializer.deserialize(type, parser, mapper));
targetMap.put(name, deserializer.deserialize(type, parser, mapper, parser.next()));
}
}

static <T extends TaggedUnion<?, ?>> JsonpDeserializer<Map<String, List<T>>> arrayMapDeserializer(
TypedKeysDeserializer<T> deserializer
) {
return JsonpDeserializer.of(
EnumSet.of(Event.START_OBJECT),
(parser, mapper, event) -> {
Map<String, List<T>> result = new HashMap<>();
while ((event = parser.next()) != Event.END_OBJECT) {
JsonpUtils.expectEvent(parser, event, Event.KEY_NAME);
// Split key and type
String key = parser.getString();
int hashPos = key.indexOf('#');
if (hashPos == -1) {
throw new JsonParsingException(
"Property name '" + key + "' is not in the 'type#name' format. Make sure the request has 'typed_keys' set.",
parser.getLocation()
);
}

String type = key.substring(0, hashPos);
String name = key.substring(hashPos + 1);

List<T> list = new ArrayList<>();
JsonpUtils.expectNextEvent(parser, Event.START_ARRAY);
while ((event = parser.next()) != Event.END_ARRAY) {
list.add(deserializer.deserializer.deserialize(type, parser, mapper, event));
}
result.put(name, list);
}
return result;
}
);
}

/**
* Serialize an externally tagged union using the typed keys encoding.
*/
Expand All @@ -119,6 +155,26 @@ public void deserializeEntry(String key, JsonParser parser, JsonpMapper mapper,
generator.writeEnd();
}

static <T extends JsonpSerializable & TaggedUnion<? extends JsonEnum, ?>> void serializeTypedKeysArray(
Map<String, List<T>> map, JsonGenerator generator, JsonpMapper mapper
) {
generator.writeStartObject();
for (Map.Entry<String, List<T>> entry: map.entrySet()) {
List<T> list = entry.getValue();
if (list.isEmpty()) {
continue; // We can't know the kind, skip this entry
}

generator.writeKey(list.get(0)._kind().jsonValue() + "#" + entry.getKey());
generator.writeStartArray();
for (T value: list) {
value.serialize(generator, mapper);
}
generator.writeEnd();
}
generator.writeEnd();
}

/**
* Serialize an externally tagged union using the typed keys encoding, without the enclosing start/end object.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package co.elastic.clients.elasticsearch;

import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jsonb.JsonbJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.time.Duration;

public class ElasticsearchTestServer implements AutoCloseable {

private volatile ElasticsearchContainer container;
private int port;
private final JsonpMapper mapper = new JsonbJsonpMapper();
private RestClient restClient;
private ElasticsearchTransport transport;
private ElasticsearchClient client;

private static ElasticsearchTestServer global;

public static synchronized ElasticsearchTestServer global() {
if (global == null) {
System.out.println("Starting global ES test server.");
global = new ElasticsearchTestServer();
global.setup();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Stopping global ES test server.");
global.close();
}));
}
return global;
}

private synchronized void setup() {
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.2")
.withEnv("ES_JAVA_OPTS", "-Xms256m -Xmx256m")
.withEnv("path.repo", "/tmp") // for snapshots
.withStartupTimeout(Duration.ofSeconds(30))
.withPassword("changeme");
container.start();
port = container.getMappedPort(9200);

BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
credsProv.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme")
);
restClient = RestClient.builder(new HttpHost("localhost", port))
.setHttpClientConfigCallback(hc -> hc.setDefaultCredentialsProvider(credsProv))
.build();
transport = new RestClientTransport(restClient, mapper);
client = new ElasticsearchClient(transport);
}

@Override
public void close() {
if (this == global) {
// Closed with a shutdown hook
return;
}

if (container != null) {
container.stop();
}
container = null;
}

public int port() {
return port;
}

public RestClient restClient() {
return restClient;
}

public ElasticsearchTransport transport() {
return transport;
}

public JsonpMapper mapper() {
return mapper;
}

public ElasticsearchClient client() {
return client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.ElasticsearchTestServer;
import co.elastic.clients.elasticsearch._types.ElasticsearchException;
import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch._types.aggregations.HistogramAggregate;
Expand All @@ -42,26 +43,12 @@
import co.elastic.clients.elasticsearch.indices.GetMappingResponse;
import co.elastic.clients.elasticsearch.indices.IndexState;
import co.elastic.clients.elasticsearch.model.ModelTestCase;
import co.elastic.clients.elasticsearch.snapshot.CreateRepositoryResponse;
import co.elastic.clients.elasticsearch.snapshot.CreateSnapshotResponse;
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jsonb.JsonbJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -70,38 +57,11 @@

public class RequestTest extends Assert {

private static ElasticsearchContainer container;
private static final JsonpMapper mapper = new JsonbJsonpMapper();
private static RestClient restClient;
private static ElasticsearchTransport transport;
private static ElasticsearchClient client;
static ElasticsearchClient client;

@BeforeClass
public static void setup() {
container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.16.2")
.withEnv("ES_JAVA_OPTS", "-Xms256m -Xmx256m")
.withEnv("path.repo", "/tmp") // for snapshots
.withStartupTimeout(Duration.ofSeconds(30))
.withPassword("changeme");
container.start();
int port = container.getMappedPort(9200);

BasicCredentialsProvider credsProv = new BasicCredentialsProvider();
credsProv.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials("elastic", "changeme")
);
restClient = RestClient.builder(new HttpHost("localhost", port))
.setHttpClientConfigCallback(hc -> hc.setDefaultCredentialsProvider(credsProv))
.build();
transport = new RestClientTransport(restClient, mapper);
client = new ElasticsearchClient(transport);
}

@AfterClass
public static void tearDown() {
if (container != null) {
container.stop();
}
client = ElasticsearchTestServer.global().client();
}

@Test
Expand All @@ -112,7 +72,7 @@ public void testCount() throws Exception {

@Test
public void testIndexCreation() throws Exception {
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(transport);
ElasticsearchAsyncClient asyncClient = new ElasticsearchAsyncClient(client._transport());

// Ping the server
assertTrue(client.ping().value());
Expand Down Expand Up @@ -222,7 +182,7 @@ public void testDataIngestion() throws Exception {
public void testCatRequest() throws IOException {
// Cat requests should have the "format=json" added by the transport
NodesResponse nodes = client.cat().nodes(_0 -> _0);
System.out.println(ModelTestCase.toJson(nodes, mapper));
System.out.println(ModelTestCase.toJson(nodes, client._transport().jsonpMapper()));

assertEquals(1, nodes.valueBody().size());
assertEquals("*", nodes.valueBody().get(0).master());
Expand All @@ -247,15 +207,25 @@ public void testBulkRequest() throws IOException {
.id("def")
.document(appData)
))
.operations(_1 -> _1
.update(_2 -> _2
.index("foo")
.id("gh")
.action(_3 -> _3
.docAsUpsert(true)
.doc(appData))
)
)
);

assertFalse(bulk.errors());
assertEquals(2, bulk.items().size());
assertEquals(3, bulk.items().size());
assertEquals(OperationType.Create, bulk.items().get(0).operationType());
assertEquals("foo", bulk.items().get(0).index());
assertEquals(1L, bulk.items().get(0).version().longValue());
assertEquals("foo", bulk.items().get(1).index());
assertEquals(1L, bulk.items().get(1).version().longValue());
assertEquals(42, client.get(b -> b.index("foo").id("gh"), AppData.class).source().intValue);
}

@Test
Expand Down Expand Up @@ -291,7 +261,7 @@ public void testRefresh() throws IOException {


ExecutionException ee = assertThrows(ExecutionException.class, () -> {
ElasticsearchAsyncClient aClient = new ElasticsearchAsyncClient(transport);
ElasticsearchAsyncClient aClient = new ElasticsearchAsyncClient(client._transport());
GetResponse<String> response = aClient.get(
_0 -> _0.index("doesnotexist").id("reallynot"), String.class
).get();
Expand Down Expand Up @@ -398,30 +368,6 @@ public void testDefaultIndexSettings() throws IOException {
assertNull(settings.get(index).defaults());
}

@Test
public void testSnapshotCreation() throws IOException {
// https://github.com/elastic/elasticsearch-java/issues/74
// https://github.com/elastic/elasticsearch/issues/82358

CreateRepositoryResponse repo = client.snapshot().createRepository(b1 -> b1
.name("test")
.type("fs")
.settings(b2 -> b2
.location("/tmp/test-repo")
)
);

assertTrue(repo.acknowledged());

CreateSnapshotResponse snapshot = client.snapshot().create(b -> b
.repository("test")
.snapshot("1")
.waitForCompletion(true)
);

assertNotNull(snapshot.snapshot());
}

@Test
public void testValueBodyResponse() throws Exception {
DiskUsageResponse resp = client.indices().diskUsage(b -> b
Expand Down
Loading