Skip to content

Added option of using an explicit ExecutorService in FileAsyncResponseTransformer #3875

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-7fea588.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "mpdn",
"description": "Added option of using an explicit `ExecutorService` in `FileAsyncResponseTransformer`"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@

package software.amazon.awssdk.core;

import java.nio.channels.AsynchronousChannelGroup;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Path;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.utils.Validate;
Expand All @@ -36,10 +40,12 @@ public final class FileTransformerConfiguration implements ToCopyableBuilder<Fil
FileTransformerConfiguration> {
private final FileWriteOption fileWriteOption;
private final FailureBehavior failureBehavior;
private final ExecutorService executorService;

private FileTransformerConfiguration(DefaultBuilder builder) {
this.fileWriteOption = Validate.paramNotNull(builder.fileWriteOption, "fileWriteOption");
this.failureBehavior = Validate.paramNotNull(builder.failureBehavior, "failureBehavior");
this.executorService = builder.executorService;
}

/**
Expand All @@ -56,6 +62,16 @@ public FailureBehavior failureBehavior() {
return failureBehavior;
}

/**
* The configured {@link ExecutorService} the writes should be executed on.
* <p>
* If not set, the default thread pool defined by the underlying {@link java.nio.file.spi.FileSystemProvider} will be used.
* This will typically be the thread pool defined by the {@link AsynchronousChannelGroup}.
*/
public Optional<ExecutorService> executorService() {
return Optional.ofNullable(executorService);
}

/**
* Create a {@link Builder}, used to create a {@link FileTransformerConfiguration}.
*/
Expand Down Expand Up @@ -118,13 +134,17 @@ public boolean equals(Object o) {
if (fileWriteOption != that.fileWriteOption) {
return false;
}
return failureBehavior == that.failureBehavior;
if (failureBehavior != that.failureBehavior) {
return false;
}
return Objects.equals(executorService, that.executorService);
}

@Override
public int hashCode() {
int result = fileWriteOption != null ? fileWriteOption.hashCode() : 0;
result = 31 * result + (failureBehavior != null ? failureBehavior.hashCode() : 0);
result = 31 * result + (executorService != null ? executorService.hashCode() : 0);
return result;
}

Expand Down Expand Up @@ -181,18 +201,28 @@ public interface Builder extends CopyableBuilder<Builder, FileTransformerConfigu
* @return This object for method chaining.
*/
Builder failureBehavior(FailureBehavior failureBehavior);

/**
* Configures the {@link ExecutorService} the writes should be executed on.
*
* @param executorService the executor service to use, or null if using the default thread pool.
* @return This object for method chaining.
*/
Builder executorService(ExecutorService executorService);
}

private static class DefaultBuilder implements Builder {
private static final class DefaultBuilder implements Builder {
private FileWriteOption fileWriteOption;
private FailureBehavior failureBehavior;
private ExecutorService executorService;

private DefaultBuilder() {
}

private DefaultBuilder(FileTransformerConfiguration fileTransformerConfiguration) {
this.fileWriteOption = fileTransformerConfiguration.fileWriteOption;
this.failureBehavior = fileTransformerConfiguration.failureBehavior;
this.executorService = fileTransformerConfiguration.executorService;
}

@Override
Expand All @@ -207,6 +237,12 @@ public Builder failureBehavior(FailureBehavior failureBehavior) {
return this;
}

@Override
public Builder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

@Override
public FileTransformerConfiguration build() {
return new FileTransformerConfiguration(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
import java.nio.channels.CompletionHandler;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -78,17 +83,24 @@ private long determineFilePositionToWrite(Path path) {
}

private AsynchronousFileChannel createChannel(Path path) throws IOException {
Set<OpenOption> options = new HashSet<>();
switch (configuration.fileWriteOption()) {
case CREATE_OR_APPEND_TO_EXISTING:
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
break;
case CREATE_OR_REPLACE_EXISTING:
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);
break;
case CREATE_NEW:
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
break;
default:
throw new IllegalArgumentException("Unsupported file write option: " + configuration.fileWriteOption());
}

ExecutorService executorService = configuration.executorService().orElse(null);
return AsynchronousFileChannel.open(path, options, executorService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,16 @@
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -213,6 +220,32 @@ private static List<FileTransformerConfiguration> configurations() {
.failureBehavior(LEAVE).build());
}

@Test
void explicitExecutor_shouldUseExecutor() throws Exception {
Path testPath = testFs.getPath("test_file.txt");
assertThat(testPath).doesNotExist();
String newContent = RandomStringUtils.randomAlphanumeric(2000);

ExecutorService executor = Executors.newSingleThreadExecutor();
try {
SpyingExecutorService spyingExecutorService = new SpyingExecutorService(executor);
FileTransformerConfiguration configuration = FileTransformerConfiguration
.builder()
.fileWriteOption(FileWriteOption.CREATE_NEW)
.failureBehavior(DELETE)
.executorService(spyingExecutorService)
.build();
FileAsyncResponseTransformer<String> transformer = new FileAsyncResponseTransformer<>(testPath, configuration);

stubSuccessfulStreaming(newContent, transformer);
assertThat(testPath).hasContent(newContent);
assertThat(spyingExecutorService.hasReceivedTasks()).isTrue();
} finally {
executor.shutdown();
assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
}
}

private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer<String> transformer) throws Exception {
CompletableFuture<String> future = transformer.prepare();
transformer.onResponse("foobar");
Expand Down Expand Up @@ -240,4 +273,90 @@ private static void stubException(String newContent, FileAsyncResponseTransforme
private static SdkPublisher<ByteBuffer> testPublisher(String content) {
return SdkPublisher.adapt(Flowable.just(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))));
}

private static final class SpyingExecutorService implements ExecutorService {
private final ExecutorService executorService;
private boolean receivedTasks = false;

private SpyingExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}

public boolean hasReceivedTasks() {
return receivedTasks;
}

@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

@Override
public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
receivedTasks = true;
return executorService.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
receivedTasks = true;
return executorService.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
receivedTasks = true;
return executorService.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
receivedTasks = true;
return executorService.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
receivedTasks = true;
return executorService.invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
receivedTasks = true;
return executorService.invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
receivedTasks = true;
return executorService.invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
receivedTasks = true;
executorService.execute(command);
}
}
}