From 975e1f20441fbc5a16d830481f47de8e75da526d Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Wed, 5 Nov 2025 14:21:19 -0800 Subject: [PATCH 1/7] Add limit on outstanding demand to ByteBufferStoringSubscriber --- .../next-release/bugfix-AmazonS3-b003027.json | 6 ++ .../crt/S3CrtRequestBodyStreamAdapter.java | 4 +- .../S3CrtRequestBodyStreamAdapterTest.java | 72 +++++++++++++++++++ .../async/ByteBufferStoringSubscriber.java | 26 +++++++ .../ByteBufferStoringSubscriberTest.java | 17 +++++ 5 files changed, 124 insertions(+), 1 deletion(-) create mode 100644 .changes/next-release/bugfix-AmazonS3-b003027.json diff --git a/.changes/next-release/bugfix-AmazonS3-b003027.json b/.changes/next-release/bugfix-AmazonS3-b003027.json new file mode 100644 index 000000000000..25148938f06f --- /dev/null +++ b/.changes/next-release/bugfix-AmazonS3-b003027.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Amazon S3", + "contributor": "", + "description": "Fix OutOfMemory issues when using S3CrtRequestBodyStreamAdapter on streams that produce data faster than they can be consumed." +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java index 7960af250325..707c2b680d5a 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java @@ -28,6 +28,8 @@ @SdkInternalApi public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream { private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L; + // for 16 kb chunks, this limits to about 16 MB (2x the standard crt provided buffer size) + private static final int MAXIMUM_OUTSTANDING_DEMAND = 1024; private final SdkHttpContentPublisher bodyPublisher; private final ByteBufferStoringSubscriber requestBodySubscriber; @@ -35,7 +37,7 @@ public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStrea public S3CrtRequestBodyStreamAdapter(SdkHttpContentPublisher bodyPublisher) { this.bodyPublisher = bodyPublisher; - this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED); + this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED, MAXIMUM_OUTSTANDING_DEMAND); } @Override diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java index e6ac41cfa551..287a2364286a 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java @@ -28,7 +28,9 @@ import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.http.async.SdkHttpContentPublisher; +import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber; class S3CrtRequestBodyStreamAdapterTest { @@ -56,6 +58,36 @@ void getRequestData_fillsInputBuffer_publisherBuffersAreSmaller() { assertThat(inputBuffer.remaining()).isEqualTo(0); } + @Test + void getRequestData_fillsInputBuffer_limitsOutstandingDemand() { + int inputBufferSize = 2000; + int maximumOutstandingDemand = 1024; + + RequestTrackingPublisher requestTrackingPublisher = new RequestTrackingPublisher(); + SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, 42L); + + S3CrtRequestBodyStreamAdapter adapter = new S3CrtRequestBodyStreamAdapter(requestBody); + + ByteBuffer inputBuffer = ByteBuffer.allocate(inputBufferSize); + for (int i = 0; i < maximumOutstandingDemand; i++) { + // we are under the minimum buffer size, so each request here increases outstanding demand by 1 + adapter.sendRequestBody(inputBuffer); + // release 1 byte of data, calling onNext (satisfies one request, but then requests 1 more) + requestTrackingPublisher.release(1); + } + // we should have 2x requests + assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2); + // but the total released bytes is only maximumOutstandingDemand + assertThat(inputBuffer.remaining()).isEqualTo(inputBufferSize - maximumOutstandingDemand + 1); + + // now that we have reached maximum outstanding demand, new requests won't be sent + adapter.sendRequestBody(inputBuffer); + assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2); + + + + } + private static SdkHttpContentPublisher requestBody(Publisher delegate, long size) { return new SdkHttpContentPublisher() { @Override @@ -114,4 +146,44 @@ public void getRequestData_publisherThrows_wrapsExceptionIfNotRuntimeException() .isInstanceOf(RuntimeException.class) .hasCauseInstanceOf(IOException.class); } + + private static class RequestTrackingPublisher implements Publisher { + ByteBufferStoringSubscriber subscriber; + RequestTrackingSubscription subscription = new RequestTrackingSubscription(); + + @Override + public void subscribe(Subscriber subscriber) { + assertThat(subscriber).isInstanceOf(ByteBufferStoringSubscriber.class); + this.subscriber = (ByteBufferStoringSubscriber) subscriber; + this.subscriber.onSubscribe(subscription); + } + + // publish up to n requests + public void release(int n) { + for (int i = 0; i < n; i++) { + ByteBuffer buffer = ByteBuffer.allocate(1); + subscriber.onNext(buffer); + } + } + + public int requests() { + return subscription.requests; + } + } + + private static class RequestTrackingSubscription implements Subscription { + + int requests = 0; + + @Override + public void request(long n) { + requests += n; + } + + @Override + public void cancel() { + + } + + } } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java index 11ffb3718f65..20c98ee2f525 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -56,6 +57,10 @@ public class ByteBufferStoringSubscriber implements Subscriber { private final Phaser phaser = new Phaser(1); + private final AtomicInteger outstandingDemand = new AtomicInteger(0); + + private final Optional maximumOutstandingDemand; + /** * The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked. */ @@ -67,6 +72,19 @@ public class ByteBufferStoringSubscriber implements Subscriber { public ByteBufferStoringSubscriber(long minimumBytesBuffered) { this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive"); this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE); + this.maximumOutstandingDemand = Optional.empty(); + } + + /** + * Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval and which limits the + * maximum outstanding demand (requests) to its subscription. + */ + public ByteBufferStoringSubscriber(long minimumBytesBuffered, int maximumOutstandingDemand) { + this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive"); + this.maximumOutstandingDemand = Optional.of(Validate.isPositive(maximumOutstandingDemand, + "maximumOutstandingDemand must be positive")); + + this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE); } /** @@ -175,12 +193,14 @@ public void onSubscribe(Subscription s) { storingSubscriber.onSubscribe(new DemandIgnoringSubscription(s)); subscription = s; subscription.request(1); + outstandingDemand.incrementAndGet(); subscriptionLatch.countDown(); } @Override public void onNext(ByteBuffer byteBuffer) { int remaining = byteBuffer.remaining(); + outstandingDemand.decrementAndGet(); storingSubscriber.onNext(byteBuffer.duplicate()); addBufferedDataAmount(remaining); phaser.arrive(); @@ -204,7 +224,13 @@ private void addBufferedDataAmount(long amountToAdd) { } private void maybeRequestMore(long currentDataBuffered) { + // if we have too many outstanding requests, no need to make more requests + if (maximumOutstandingDemand.isPresent() && outstandingDemand.get() >= maximumOutstandingDemand.get()) { + return; + } + if (currentDataBuffered < minimumBytesBuffered) { + outstandingDemand.incrementAndGet(); subscription.request(1); } } diff --git a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java index a6c5e4a920f5..8a3a537a747c 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java @@ -53,10 +53,13 @@ public class ByteBufferStoringSubscriberTest { public void constructorCalled_withNonPositiveSize_throwsException() { assertThatCode(() -> new ByteBufferStoringSubscriber(1)).doesNotThrowAnyException(); assertThatCode(() -> new ByteBufferStoringSubscriber(Integer.MAX_VALUE)).doesNotThrowAnyException(); + assertThatCode(() -> new ByteBufferStoringSubscriber(1, 1)).doesNotThrowAnyException(); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(0)).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(-1)).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(Integer.MIN_VALUE)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, 0)).isInstanceOf(IllegalArgumentException.class); + assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, -1)).isInstanceOf(IllegalArgumentException.class); } @Test @@ -76,6 +79,20 @@ public void doesNotRequestMoreThanMaxBytes() { verifyNoMoreInteractions(subscription); } + @Test + public void doesNotRequestMoreThanMaxDemand() { + ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5, 2); + + subscriber.onSubscribe(subscription); // request 1, demand = 1 + subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0 + subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 1 + subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 2 + verify(subscription, times(3)).request(1); + + subscriber.transferTo(emptyByteBufferOfSize(1)); // demand already maximum, no request + verifyNoMoreInteractions(subscription); + } + @Test public void canStoreMoreThanMaxBytesButWontAskForMoreUntilBelowMax() { ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3); From 326e615849c4680145d44ac86c42966bb0238113 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Wed, 5 Nov 2025 14:42:45 -0800 Subject: [PATCH 2/7] Fix int -> long --- .../s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java index 287a2364286a..3b23a8aa0c10 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java @@ -166,14 +166,14 @@ public void release(int n) { } } - public int requests() { + public long requests() { return subscription.requests; } } private static class RequestTrackingSubscription implements Subscription { - int requests = 0; + long requests = 0; @Override public void request(long n) { From 3c8d84b4336bc487f0a14440f44dec8c513b3a08 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 6 Nov 2025 09:20:39 -0800 Subject: [PATCH 3/7] Use a sizehint instead of relying on a new parameter --- .../crt/S3CrtRequestBodyStreamAdapter.java | 6 ++-- .../S3CrtRequestBodyStreamAdapterTest.java | 33 ++++++++----------- .../async/ByteBufferStoringSubscriber.java | 31 ++++++----------- .../ByteBufferStoringSubscriberTest.java | 12 +++---- 4 files changed, 30 insertions(+), 52 deletions(-) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java index 707c2b680d5a..e91736ea4901 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java @@ -27,9 +27,7 @@ */ @SdkInternalApi public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream { - private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L; - // for 16 kb chunks, this limits to about 16 MB (2x the standard crt provided buffer size) - private static final int MAXIMUM_OUTSTANDING_DEMAND = 1024; + private static final long MINIMUM_BYTES_BUFFERED = 16 * 1024 * 1024L; private final SdkHttpContentPublisher bodyPublisher; private final ByteBufferStoringSubscriber requestBodySubscriber; @@ -37,7 +35,7 @@ public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStrea public S3CrtRequestBodyStreamAdapter(SdkHttpContentPublisher bodyPublisher) { this.bodyPublisher = bodyPublisher; - this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED, MAXIMUM_OUTSTANDING_DEMAND); + this.requestBodySubscriber = new ByteBufferStoringSubscriber(MINIMUM_BYTES_BUFFERED); } @Override diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java index 3b23a8aa0c10..2d1c2cd7845f 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java @@ -60,32 +60,25 @@ void getRequestData_fillsInputBuffer_publisherBuffersAreSmaller() { @Test void getRequestData_fillsInputBuffer_limitsOutstandingDemand() { - int inputBufferSize = 2000; - int maximumOutstandingDemand = 1024; + int minBytesBuffered = 16 * 1024 * 1024; + int inputBufferSize = 1024; RequestTrackingPublisher requestTrackingPublisher = new RequestTrackingPublisher(); - SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, 42L); + SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, minBytesBuffered); S3CrtRequestBodyStreamAdapter adapter = new S3CrtRequestBodyStreamAdapter(requestBody); ByteBuffer inputBuffer = ByteBuffer.allocate(inputBufferSize); - for (int i = 0; i < maximumOutstandingDemand; i++) { - // we are under the minimum buffer size, so each request here increases outstanding demand by 1 - adapter.sendRequestBody(inputBuffer); - // release 1 byte of data, calling onNext (satisfies one request, but then requests 1 more) - requestTrackingPublisher.release(1); - } - // we should have 2x requests - assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2); - // but the total released bytes is only maximumOutstandingDemand - assertThat(inputBuffer.remaining()).isEqualTo(inputBufferSize - maximumOutstandingDemand + 1); - - // now that we have reached maximum outstanding demand, new requests won't be sent - adapter.sendRequestBody(inputBuffer); - assertThat(requestTrackingPublisher.requests()).isEqualTo(maximumOutstandingDemand * 2); - + adapter.sendRequestBody(inputBuffer); // initiate the subscription, but no bytes available, makes 1 request + // release 1 request of minBytesBuffered bytes of data, calling onNext (satisfies one request, but then requests 1 more) + requestTrackingPublisher.release(1, minBytesBuffered-100); + assertThat(requestTrackingPublisher.requests()).isEqualTo(2); + // call sendRequestBody, outstandingDemand=1, sizeHint=16*1024*1024-100 + existing data buffered is > our min + // so no more requests will be made + adapter.sendRequestBody(inputBuffer); + assertThat(requestTrackingPublisher.requests()).isEqualTo(2); } private static SdkHttpContentPublisher requestBody(Publisher delegate, long size) { @@ -159,9 +152,9 @@ public void subscribe(Subscriber subscriber) { } // publish up to n requests - public void release(int n) { + public void release(int n, int size) { for (int i = 0; i < n; i++) { - ByteBuffer buffer = ByteBuffer.allocate(1); + ByteBuffer buffer = ByteBuffer.allocate(size); subscriber.onNext(buffer); } } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java index 20c98ee2f525..5eb073bb7f86 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java @@ -59,7 +59,7 @@ public class ByteBufferStoringSubscriber implements Subscriber { private final AtomicInteger outstandingDemand = new AtomicInteger(0); - private final Optional maximumOutstandingDemand; + private volatile long byteBufferSizeHint = 0L; /** * The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked. @@ -67,24 +67,13 @@ public class ByteBufferStoringSubscriber implements Subscriber { private Subscription subscription; /** - * Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval. + * Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval. The subscriber will + * only request more from the subscription when fewer bytes are buffered AND in flight requests from the subscription will + * likely be under minimumBytesBuffered. */ public ByteBufferStoringSubscriber(long minimumBytesBuffered) { this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive"); this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE); - this.maximumOutstandingDemand = Optional.empty(); - } - - /** - * Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval and which limits the - * maximum outstanding demand (requests) to its subscription. - */ - public ByteBufferStoringSubscriber(long minimumBytesBuffered, int maximumOutstandingDemand) { - this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive"); - this.maximumOutstandingDemand = Optional.of(Validate.isPositive(maximumOutstandingDemand, - "maximumOutstandingDemand must be positive")); - - this.storingSubscriber = new StoringSubscriber<>(Integer.MAX_VALUE); } /** @@ -201,6 +190,10 @@ public void onSubscribe(Subscription s) { public void onNext(ByteBuffer byteBuffer) { int remaining = byteBuffer.remaining(); outstandingDemand.decrementAndGet(); + // atomic update not required here, in a race it does not matter which thread sets this value since it is not being + // incremented, just set. + byteBufferSizeHint = byteBuffer.remaining(); + storingSubscriber.onNext(byteBuffer.duplicate()); addBufferedDataAmount(remaining); phaser.arrive(); @@ -224,12 +217,8 @@ private void addBufferedDataAmount(long amountToAdd) { } private void maybeRequestMore(long currentDataBuffered) { - // if we have too many outstanding requests, no need to make more requests - if (maximumOutstandingDemand.isPresent() && outstandingDemand.get() >= maximumOutstandingDemand.get()) { - return; - } - - if (currentDataBuffered < minimumBytesBuffered) { + long dataBufferedAndInFlight = currentDataBuffered + (byteBufferSizeHint * outstandingDemand.get()); + if (dataBufferedAndInFlight < minimumBytesBuffered) { outstandingDemand.incrementAndGet(); subscription.request(1); } diff --git a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java index 8a3a537a747c..90cd9425eb9d 100644 --- a/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java +++ b/utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java @@ -53,13 +53,10 @@ public class ByteBufferStoringSubscriberTest { public void constructorCalled_withNonPositiveSize_throwsException() { assertThatCode(() -> new ByteBufferStoringSubscriber(1)).doesNotThrowAnyException(); assertThatCode(() -> new ByteBufferStoringSubscriber(Integer.MAX_VALUE)).doesNotThrowAnyException(); - assertThatCode(() -> new ByteBufferStoringSubscriber(1, 1)).doesNotThrowAnyException(); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(0)).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(-1)).isInstanceOf(IllegalArgumentException.class); assertThatThrownBy(() -> new ByteBufferStoringSubscriber(Integer.MIN_VALUE)).isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, 0)).isInstanceOf(IllegalArgumentException.class); - assertThatThrownBy(() -> new ByteBufferStoringSubscriber(1, -1)).isInstanceOf(IllegalArgumentException.class); } @Test @@ -80,16 +77,17 @@ public void doesNotRequestMoreThanMaxBytes() { } @Test - public void doesNotRequestMoreThanMaxDemand() { - ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5, 2); + public void doesNotRequestMoreWhenInflightMoreThanMinBytes() { + ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5); subscriber.onSubscribe(subscription); // request 1, demand = 1 - subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0 + subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0, sizeHint=3 subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 1 subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 2 verify(subscription, times(3)).request(1); - subscriber.transferTo(emptyByteBufferOfSize(1)); // demand already maximum, no request + //sizeHint=3, demand=2, dataBufferedAndInFlight=6. 6 > 5, so no new request + subscriber.transferTo(emptyByteBufferOfSize(1)); verifyNoMoreInteractions(subscription); } From 75eb29197e19852ea747c67e13a0e2007a6bf82d Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 6 Nov 2025 09:28:29 -0800 Subject: [PATCH 4/7] Fix order of outsandingDemand and request in onSubscribe --- .../amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java index 5eb073bb7f86..3174e581e24c 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java @@ -181,8 +181,8 @@ private int transfer(ByteBuffer in, ByteBuffer out) { public void onSubscribe(Subscription s) { storingSubscriber.onSubscribe(new DemandIgnoringSubscription(s)); subscription = s; - subscription.request(1); outstandingDemand.incrementAndGet(); + subscription.request(1); subscriptionLatch.countDown(); } From 43044d692d98361e59badad4242f1bfaf7332b5b Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 6 Nov 2025 09:53:48 -0800 Subject: [PATCH 5/7] Update changelog --- .changes/next-release/bugfix-AmazonS3-b003027.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.changes/next-release/bugfix-AmazonS3-b003027.json b/.changes/next-release/bugfix-AmazonS3-b003027.json index 25148938f06f..28cd1f7e90e9 100644 --- a/.changes/next-release/bugfix-AmazonS3-b003027.json +++ b/.changes/next-release/bugfix-AmazonS3-b003027.json @@ -1,6 +1,6 @@ { "type": "bugfix", - "category": "Amazon S3", + "category": "AWS SDK for Java v2", "contributor": "", - "description": "Fix OutOfMemory issues when using S3CrtRequestBodyStreamAdapter on streams that produce data faster than they can be consumed." + "description": "Consider outstanding demand in ByteBufferStoringSubscriber before requesting more - fixes OutOfMemoryIssues in S3CrtRequestBodyStreamAdapter" } From 997f254f72b67ff38cf2f6f91d5bf37f55b42a09 Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 6 Nov 2025 11:26:43 -0800 Subject: [PATCH 6/7] Update changelog wording --- .changes/next-release/bugfix-AmazonS3-b003027.json | 2 +- .../awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.changes/next-release/bugfix-AmazonS3-b003027.json b/.changes/next-release/bugfix-AmazonS3-b003027.json index 28cd1f7e90e9..bec18c74bd71 100644 --- a/.changes/next-release/bugfix-AmazonS3-b003027.json +++ b/.changes/next-release/bugfix-AmazonS3-b003027.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "AWS SDK for Java v2", "contributor": "", - "description": "Consider outstanding demand in ByteBufferStoringSubscriber before requesting more - fixes OutOfMemoryIssues in S3CrtRequestBodyStreamAdapter" + "description": "Consider outstanding demand in ByteBufferStoringSubscriber before requesting more - fixes OutOfMemoryIssues that may occur when using AWS CRT-based S3 client to upload a large object." } diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java index 6fa64d8a011d..851f547507b5 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java @@ -36,9 +36,11 @@ final class CrtRequestBodyAdapter implements HttpRequestBodyStream { @Override public boolean sendRequestBody(ByteBuffer bodyBytesOut) { + System.out.println("ByteBufer size: " + bodyBytesOut.remaining()); if (subscribed.compareAndSet(false, true)) { requestPublisher.subscribe(requestBodySubscriber); } + System.out.println("ByteBufer size: " + bodyBytesOut.remaining()); return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; } From 4510e2fcc1e159d685b3ce144d973f10700f7f4a Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Thu, 6 Nov 2025 11:46:43 -0800 Subject: [PATCH 7/7] Remove accidental commit of debug logging --- .../awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java index 851f547507b5..6fa64d8a011d 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java @@ -36,11 +36,9 @@ final class CrtRequestBodyAdapter implements HttpRequestBodyStream { @Override public boolean sendRequestBody(ByteBuffer bodyBytesOut) { - System.out.println("ByteBufer size: " + bodyBytesOut.remaining()); if (subscribed.compareAndSet(false, true)) { requestPublisher.subscribe(requestBodySubscriber); } - System.out.println("ByteBufer size: " + bodyBytesOut.remaining()); return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM; }