Description
Describe the bug
Starting from AWS SDK version 2.30.0, the multipart upload calls ContentStreamProvider::newStream()
twice on the provider
passed to RequestBody::fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType)
. This leads to two InputStream
instances being created, but only the second one gets closed. If code using the SDK creates an InputSteam
that is tied to an OS handle (like reading from file), the OS handle leaks.
For quite a few streams in Java, it doesn't really matter if you close them or not. Typically this goes for "in-memory" streams, such as ByteArrayInputStream
; after all, there is no underlying handle to release. However, if you provide an InputStream
that has has a native OS handle attached to it (such as a file handle), the container may run out of file handles and crash.
For our use case, this issue is blocking us from upgrading to version 2.30, simply because it breaks our exports feature. We unload data (chunks) onto S3. We then load these chunks (one-by-one, in order) from our service to run them through a zip stream, and upload the result as parts. This process has been running fine for years with the AWS sdk up to version 2.29.
Regression Issue
- Select this option if this issue appears to be a regression.
Expected Behavior
The AWS SDK should close any InputStream's
it asks, preferably only once.
Preferably, the SDK should also only ask for the InputStream once, as producing the stream may be an expensive operation.
Current Behavior
The SDK invokes newStream()
on the provided ContentStreamProvider
twice, but only closes the second stream. This has been observed by adding logging in our application, which produced this output (heavily simplified):
Requested newStream() -> InputStream@49022cf9 (active streams: 1)
Requested newStream() -> InputStream@79fe40f (active streams: 2)
Closed InputStream@79fe40f (active streams: 1)
(No more logs related to this)
I'm pretty confident this issue was first introduced in #4492. Specifically, the AwsChunkedV4PayloadSigner::beforeSigning method calls newStream()
, but does not close it.
public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload) {
long encodedContentLength = 0;
long contentLength = moveContentLength(request, payload != null ? payload.newStream() : new StringInputStream(""));
It doesn't close it, because the SignerUtils::moveContentLength method takes an InputStream
, but never closes it.
Reproduction Steps
Writing out a realistic example for a multipart upload is quite complicated, especially with the requirement of any non-last parts to have a size > 5mb. In short, to trace the issue, start a multipart upload (we use the synchronous S3Client
), and upload a part using a ContentStreamProvider
that has been patched to track "open" (ContentStreamProvider::newStream()
) and close (InputStream::close()
) actions. We're providing the content-length (as we know it), as well as a mime-type.
Expand for an extremely simplified example of the issue.
package example;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;
public class MultiPartUploadIssueExample {
/**
* Tiny wrapper around {@link ByteArrayInputStream}, adding logic on {@link #close()}: calls the
* provided callback and resets the reference to the shared buffer.
*/
private static class ByteArrayInputStreamWithCallback extends ByteArrayInputStream {
private final Runnable closeCallback;
public ByteArrayInputStreamWithCallback(byte[] buf, Runnable closeCallback) {
super(buf);
this.closeCallback = Objects.requireNonNull(closeCallback);
}
@Override
public synchronized void close() throws IOException {
super.close();
closeCallback.run();
}
}
private static class CountingContentProvider implements ContentStreamProvider {
private final byte[] data;
private final AtomicInteger count;
public CountingContentProvider(byte[] data) {
this.data = Objects.requireNonNull(data);
this.count = new AtomicInteger();
}
@Override
public InputStream newStream() {
count.incrementAndGet();
return new ByteArrayInputStreamWithCallback(data, count::decrementAndGet);
}
public long contentLength() {
return data.length;
}
public int getOpenCount() {
return count.get();
}
}
private final S3Client s3Client;
public MultiPartUploadIssueExample(S3Client s3Client) {
this.s3Client = s3Client;
}
public CompleteMultipartUploadResponse executeExample(String bucketName, String targetFile, String singlePart) {
CreateMultipartUploadRequest createMultipartUploadRequest =
CreateMultipartUploadRequest.builder()
.bucket(bucketName)
.key(targetFile)
.build();
final CreateMultipartUploadResponse pendingMultipartUpload =
s3Client.createMultipartUpload(createMultipartUploadRequest);
try {
List<CompletedPart> uploadedParts = new ArrayList<>(1);
// This would normally be in a loop
int partNumber = 1;
CountingContentProvider provider = new CountingContentProvider(singlePart.getBytes());
RequestBody requestBody = RequestBody.fromContentProvider(provider, provider.contentLength(), "application/octet-stream");
UploadPartRequest.Builder uploadPartRequestBuilder =
UploadPartRequest.builder()
.bucket(bucketName)
.key(targetFile)
.uploadId(pendingMultipartUpload.uploadId())
.partNumber(partNumber);
requestBody.optionalContentLength().ifPresent(uploadPartRequestBuilder::contentLength);
UploadPartResponse uploadPartResponse =
s3Client.uploadPart(uploadPartRequestBuilder.build(), requestBody);
// Theoretically, this should report 0; but in practice it reports 1 open stream.
System.out.println("Open stream handles after upload: " + provider.getOpenCount());
uploadedParts.add(CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build());
// End of hypothetical loop
CompletedMultipartUpload completedMultipartUpload =
CompletedMultipartUpload.builder().parts(uploadedParts).build();
CompleteMultipartUploadRequest completeMultipartUploadRequest =
CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
.key(targetFile)
.uploadId(pendingMultipartUpload.uploadId())
.multipartUpload(completedMultipartUpload)
.build();
return s3Client.completeMultipartUpload(completeMultipartUploadRequest);
} catch (Exception e) {
s3Client.abortMultipartUpload(
AbortMultipartUploadRequest.builder()
.bucket(bucketName)
.key(targetFile)
.uploadId(pendingMultipartUpload.uploadId())
.build());
throw e;
}
}
}
Possible Solution
Obviously: close all user-provided streams the SDK asks for. There is some nuance to this though: the SDK should also be mindful of memory usage when working with data to upload.
The use case we are using the multipart upload with is basically:
- an external data base unloads data into S3 (often spread over multiple files).
- we iterate the chunks sequentially, reading them from S3 (one-by-one), streaming the data into a zip stream.
- The output of the zip stream is uploaded in chunks, so we are more resilient against network issues (only have to re-do a part, not the whole large file, if a part fails).
We used to have a naïve process that would keep all data in memory. It is probably no surprise that our service tended to Out-of-Memory on larger exports (think 1gb range). About two years ago, we re-wrote the implementation to be fully streaming. The data is streamed directly from S3 (the getObject response) into the zip output stream. The output stream is a custom byte buffer that follows an "exclusive write, multiple read" pattern; and it enforces this. To write to the stream, no outstanding readers may be active. It is allowed to read multiple times from the same buffer though. With this set-up, an export takes about 20Mb of ram to process; regardless of how big the actual export is. There are some read buffers, and we have a 16Mb chunk buffer that is written to, and read from, repeatedly.
It was the sanity check of this shared buffer class that alerted us to the issue in sdk version 2.30.0 and upwards. Once the first part was done uploading to S3, the streaming zip operation requested write access to the buffer, but that flagged (via an exception) that there was still a reader open.
I want to pinpoint a specific unfortunate abstraction. Look at StreamManagingStage::ClosingStreamProvider and ask yourself: what happens if newStream()
is called multiple times, before closeCurrentStream()
is invoked? The answer is of course: any non-latest currentStream
value is overwritten, and those streams are leaked.
Additional Information/Context
To add some unsolicited advice:
I think it's best to take a good look at the SDK's usage of streams, and revisit how input data is handled. This summary may be too simple (I may be missing something), but to my knowledge, the SDK needs to know:
- The content-length of the body about to be uploaded.
- A checksum of the body (requires iterating the content).
- The body itself, for the http request.
I want to clarify that I grasp the complexity of all the possible use-cases the SDK has to support. If your input is just an InputStream
, you'll have to consume the stream at least twice; once to determine the length and checksum, and once more to actually stream the content to the http client (during upload). I think the SDK could do a better job of abstracting the common issues away. But at its core, the problem is also fuzzy ownership (and Java doesn't help there). Once you take ownership of an InputStream
, you need to close it. Sharing an InputStream
between multiple readers is dangerous, and requires sanity check to ensure the stream is not read concurrently by multiple parties.
What I think the SDK should do, to be both memory efficient and clear in its usage pattern, is either:
- Only support
byte[]
inputs (and clarifying ownership of thebyte[]
is transferred to the SDK, e.g. no more writes should be performed). - Be smart about
InputStream
: if it supports resetting, use resetting when needing to read multiple times. Only copy a stream's contents to an in-memory buffer if it doesn't support resetting.
I notice attempts to avoid shared ownership issues were made in #5837 and #5841, by simply copying the data. Blatantly copying all data is an anti-pattern though. As I described in "Possible solution", we've worked hard to make our export process memory efficient. With the AWS SDK just copying data "to be safe", you've basically taken away any control from end-users to avoid going Out-of-Memory on large uploads. I can't resist pointing out it didn't take long for that to happen, as evidenced by issue #5850.
AWS Java SDK version used
2.30.0
JDK version used
openjdk 21.0.1 2023-10-17 LTS
Operating System and version
Windows 11 24H2, build 26100.2894