Skip to content

AWS v2.30 SDK InputStream behavior changes #5859

Open
@rphilippen

Description

@rphilippen

Describe the bug

Starting from AWS SDK version 2.30.0, the multipart upload calls ContentStreamProvider::newStream() twice on the provider passed to RequestBody::fromContentProvider(ContentStreamProvider provider, long contentLength, String mimeType). This leads to two InputStream instances being created, but only the second one gets closed. If code using the SDK creates an InputSteam that is tied to an OS handle (like reading from file), the OS handle leaks.

For quite a few streams in Java, it doesn't really matter if you close them or not. Typically this goes for "in-memory" streams, such as ByteArrayInputStream; after all, there is no underlying handle to release. However, if you provide an InputStream that has has a native OS handle attached to it (such as a file handle), the container may run out of file handles and crash.

For our use case, this issue is blocking us from upgrading to version 2.30, simply because it breaks our exports feature. We unload data (chunks) onto S3. We then load these chunks (one-by-one, in order) from our service to run them through a zip stream, and upload the result as parts. This process has been running fine for years with the AWS sdk up to version 2.29.

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

The AWS SDK should close any InputStream's it asks, preferably only once.
Preferably, the SDK should also only ask for the InputStream once, as producing the stream may be an expensive operation.

Current Behavior

The SDK invokes newStream() on the provided ContentStreamProvider twice, but only closes the second stream. This has been observed by adding logging in our application, which produced this output (heavily simplified):

Requested newStream() -> InputStream@49022cf9 (active streams: 1)
Requested newStream() -> InputStream@79fe40f (active streams: 2)
Closed InputStream@79fe40f (active streams: 1)

(No more logs related to this)

I'm pretty confident this issue was first introduced in #4492. Specifically, the AwsChunkedV4PayloadSigner::beforeSigning method calls newStream(), but does not close it.

    public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload) {
        long encodedContentLength = 0;
        long contentLength = moveContentLength(request, payload != null ? payload.newStream() : new StringInputStream(""));

It doesn't close it, because the SignerUtils::moveContentLength method takes an InputStream, but never closes it.

Reproduction Steps

Writing out a realistic example for a multipart upload is quite complicated, especially with the requirement of any non-last parts to have a size > 5mb. In short, to trace the issue, start a multipart upload (we use the synchronous S3Client), and upload a part using a ContentStreamProvider that has been patched to track "open" (ContentStreamProvider::newStream()) and close (InputStream::close()) actions. We're providing the content-length (as we know it), as well as a mime-type.

Expand for an extremely simplified example of the issue.

package example;

import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;

import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.*;

public class MultiPartUploadIssueExample {
  /**
   * Tiny wrapper around {@link ByteArrayInputStream}, adding logic on {@link #close()}: calls the
   * provided callback and resets the reference to the shared buffer.
   */
  private static class ByteArrayInputStreamWithCallback extends ByteArrayInputStream {
    private final Runnable closeCallback;

    public ByteArrayInputStreamWithCallback(byte[] buf, Runnable closeCallback) {
      super(buf);
      this.closeCallback = Objects.requireNonNull(closeCallback);
    }

    @Override
    public synchronized void close() throws IOException {
      super.close();
      closeCallback.run();
    }
  }

  private static class CountingContentProvider implements ContentStreamProvider {
    private final byte[] data;
    private final AtomicInteger count;

    public CountingContentProvider(byte[] data) {
      this.data = Objects.requireNonNull(data);
      this.count = new AtomicInteger();
    }

    @Override
    public InputStream newStream() {
      count.incrementAndGet();
      return new ByteArrayInputStreamWithCallback(data, count::decrementAndGet);
    }

    public long contentLength() {
      return data.length;
    }

    public int getOpenCount() {
      return count.get();
    }
  }

  private final S3Client s3Client;

  public MultiPartUploadIssueExample(S3Client s3Client) {
    this.s3Client = s3Client;
  }

  public CompleteMultipartUploadResponse executeExample(String bucketName, String targetFile, String singlePart) {
    CreateMultipartUploadRequest createMultipartUploadRequest =
        CreateMultipartUploadRequest.builder()
            .bucket(bucketName)
            .key(targetFile)
            .build();

    final CreateMultipartUploadResponse pendingMultipartUpload =
        s3Client.createMultipartUpload(createMultipartUploadRequest);

    try {
      List<CompletedPart> uploadedParts = new ArrayList<>(1);

      // This would normally be in a loop
      int partNumber = 1;
      CountingContentProvider provider = new CountingContentProvider(singlePart.getBytes());

      RequestBody requestBody = RequestBody.fromContentProvider(provider, provider.contentLength(), "application/octet-stream");

      UploadPartRequest.Builder uploadPartRequestBuilder =
          UploadPartRequest.builder()
              .bucket(bucketName)
              .key(targetFile)
              .uploadId(pendingMultipartUpload.uploadId())
              .partNumber(partNumber);
      requestBody.optionalContentLength().ifPresent(uploadPartRequestBuilder::contentLength);

      UploadPartResponse uploadPartResponse =
          s3Client.uploadPart(uploadPartRequestBuilder.build(), requestBody);

      // Theoretically, this should report 0; but in practice it reports 1 open stream.
      System.out.println("Open stream handles after upload: " + provider.getOpenCount());

      uploadedParts.add(CompletedPart.builder().partNumber(partNumber).eTag(uploadPartResponse.eTag()).build());
      // End of hypothetical loop

      CompletedMultipartUpload completedMultipartUpload =
          CompletedMultipartUpload.builder().parts(uploadedParts).build();
      CompleteMultipartUploadRequest completeMultipartUploadRequest =
          CompleteMultipartUploadRequest.builder()
              .bucket(bucketName)
              .key(targetFile)
              .uploadId(pendingMultipartUpload.uploadId())
              .multipartUpload(completedMultipartUpload)
              .build();

      return s3Client.completeMultipartUpload(completeMultipartUploadRequest);
    } catch (Exception e) {
      s3Client.abortMultipartUpload(
          AbortMultipartUploadRequest.builder()
              .bucket(bucketName)
              .key(targetFile)
              .uploadId(pendingMultipartUpload.uploadId())
              .build());
      throw e;
    }
  }
}

Possible Solution

Obviously: close all user-provided streams the SDK asks for. There is some nuance to this though: the SDK should also be mindful of memory usage when working with data to upload.

The use case we are using the multipart upload with is basically:

  • an external data base unloads data into S3 (often spread over multiple files).
  • we iterate the chunks sequentially, reading them from S3 (one-by-one), streaming the data into a zip stream.
  • The output of the zip stream is uploaded in chunks, so we are more resilient against network issues (only have to re-do a part, not the whole large file, if a part fails).

We used to have a naïve process that would keep all data in memory. It is probably no surprise that our service tended to Out-of-Memory on larger exports (think 1gb range). About two years ago, we re-wrote the implementation to be fully streaming. The data is streamed directly from S3 (the getObject response) into the zip output stream. The output stream is a custom byte buffer that follows an "exclusive write, multiple read" pattern; and it enforces this. To write to the stream, no outstanding readers may be active. It is allowed to read multiple times from the same buffer though. With this set-up, an export takes about 20Mb of ram to process; regardless of how big the actual export is. There are some read buffers, and we have a 16Mb chunk buffer that is written to, and read from, repeatedly.

It was the sanity check of this shared buffer class that alerted us to the issue in sdk version 2.30.0 and upwards. Once the first part was done uploading to S3, the streaming zip operation requested write access to the buffer, but that flagged (via an exception) that there was still a reader open.

I want to pinpoint a specific unfortunate abstraction. Look at StreamManagingStage::ClosingStreamProvider and ask yourself: what happens if newStream() is called multiple times, before closeCurrentStream() is invoked? The answer is of course: any non-latest currentStream value is overwritten, and those streams are leaked.

Additional Information/Context

To add some unsolicited advice:
I think it's best to take a good look at the SDK's usage of streams, and revisit how input data is handled. This summary may be too simple (I may be missing something), but to my knowledge, the SDK needs to know:

  1. The content-length of the body about to be uploaded.
  2. A checksum of the body (requires iterating the content).
  3. The body itself, for the http request.

I want to clarify that I grasp the complexity of all the possible use-cases the SDK has to support. If your input is just an InputStream, you'll have to consume the stream at least twice; once to determine the length and checksum, and once more to actually stream the content to the http client (during upload). I think the SDK could do a better job of abstracting the common issues away. But at its core, the problem is also fuzzy ownership (and Java doesn't help there). Once you take ownership of an InputStream, you need to close it. Sharing an InputStream between multiple readers is dangerous, and requires sanity check to ensure the stream is not read concurrently by multiple parties.

What I think the SDK should do, to be both memory efficient and clear in its usage pattern, is either:

  1. Only support byte[] inputs (and clarifying ownership of the byte[] is transferred to the SDK, e.g. no more writes should be performed).
  2. Be smart about InputStream: if it supports resetting, use resetting when needing to read multiple times. Only copy a stream's contents to an in-memory buffer if it doesn't support resetting.

I notice attempts to avoid shared ownership issues were made in #5837 and #5841, by simply copying the data. Blatantly copying all data is an anti-pattern though. As I described in "Possible solution", we've worked hard to make our export process memory efficient. With the AWS SDK just copying data "to be safe", you've basically taken away any control from end-users to avoid going Out-of-Memory on large uploads. I can't resist pointing out it didn't take long for that to happen, as evidenced by issue #5850.

AWS Java SDK version used

2.30.0

JDK version used

openjdk 21.0.1 2023-10-17 LTS

Operating System and version

Windows 11 24H2, build 26100.2894

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugThis issue is a bug.p2This is a standard priority issue

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions