Skip to content

Commit 72a30db

Browse files
authored
Consider outstanding demand in ByteBufferStoringSubscriber before requesting more (#6549)
* Add limit on outstanding demand to ByteBufferStoringSubscriber * Fix int -> long * Use a sizehint instead of relying on a new parameter * Fix order of outsandingDemand and request in onSubscribe * Update changelog * Update changelog wording * Remove accidental commit of debug logging
1 parent 06d7285 commit 72a30db

File tree

5 files changed

+104
-3
lines changed

5 files changed

+104
-3
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Consider outstanding demand in ByteBufferStoringSubscriber before requesting more - fixes OutOfMemoryIssues that may occur when using AWS CRT-based S3 client to upload a large object."
6+
}

services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
*/
2828
@SdkInternalApi
2929
public final class S3CrtRequestBodyStreamAdapter implements HttpRequestBodyStream {
30-
private static final long MINIMUM_BYTES_BUFFERED = 1024 * 1024L;
30+
private static final long MINIMUM_BYTES_BUFFERED = 16 * 1024 * 1024L;
3131
private final SdkHttpContentPublisher bodyPublisher;
3232
private final ByteBufferStoringSubscriber requestBodySubscriber;
3333

services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtRequestBodyStreamAdapterTest.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@
2828
import org.junit.jupiter.api.Test;
2929
import org.reactivestreams.Publisher;
3030
import org.reactivestreams.Subscriber;
31+
import org.reactivestreams.Subscription;
3132
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
33+
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
3234

3335
class S3CrtRequestBodyStreamAdapterTest {
3436

@@ -56,6 +58,29 @@ void getRequestData_fillsInputBuffer_publisherBuffersAreSmaller() {
5658
assertThat(inputBuffer.remaining()).isEqualTo(0);
5759
}
5860

61+
@Test
62+
void getRequestData_fillsInputBuffer_limitsOutstandingDemand() {
63+
int minBytesBuffered = 16 * 1024 * 1024;
64+
int inputBufferSize = 1024;
65+
66+
RequestTrackingPublisher requestTrackingPublisher = new RequestTrackingPublisher();
67+
SdkHttpContentPublisher requestBody = requestBody(requestTrackingPublisher, minBytesBuffered);
68+
69+
S3CrtRequestBodyStreamAdapter adapter = new S3CrtRequestBodyStreamAdapter(requestBody);
70+
71+
ByteBuffer inputBuffer = ByteBuffer.allocate(inputBufferSize);
72+
adapter.sendRequestBody(inputBuffer); // initiate the subscription, but no bytes available, makes 1 request
73+
74+
// release 1 request of minBytesBuffered bytes of data, calling onNext (satisfies one request, but then requests 1 more)
75+
requestTrackingPublisher.release(1, minBytesBuffered-100);
76+
assertThat(requestTrackingPublisher.requests()).isEqualTo(2);
77+
78+
// call sendRequestBody, outstandingDemand=1, sizeHint=16*1024*1024-100 + existing data buffered is > our min
79+
// so no more requests will be made
80+
adapter.sendRequestBody(inputBuffer);
81+
assertThat(requestTrackingPublisher.requests()).isEqualTo(2);
82+
}
83+
5984
private static SdkHttpContentPublisher requestBody(Publisher<ByteBuffer> delegate, long size) {
6085
return new SdkHttpContentPublisher() {
6186
@Override
@@ -114,4 +139,44 @@ public void getRequestData_publisherThrows_wrapsExceptionIfNotRuntimeException()
114139
.isInstanceOf(RuntimeException.class)
115140
.hasCauseInstanceOf(IOException.class);
116141
}
142+
143+
private static class RequestTrackingPublisher implements Publisher<ByteBuffer> {
144+
ByteBufferStoringSubscriber subscriber;
145+
RequestTrackingSubscription subscription = new RequestTrackingSubscription();
146+
147+
@Override
148+
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
149+
assertThat(subscriber).isInstanceOf(ByteBufferStoringSubscriber.class);
150+
this.subscriber = (ByteBufferStoringSubscriber) subscriber;
151+
this.subscriber.onSubscribe(subscription);
152+
}
153+
154+
// publish up to n requests
155+
public void release(int n, int size) {
156+
for (int i = 0; i < n; i++) {
157+
ByteBuffer buffer = ByteBuffer.allocate(size);
158+
subscriber.onNext(buffer);
159+
}
160+
}
161+
162+
public long requests() {
163+
return subscription.requests;
164+
}
165+
}
166+
167+
private static class RequestTrackingSubscription implements Subscription {
168+
169+
long requests = 0;
170+
171+
@Override
172+
public void request(long n) {
173+
requests += n;
174+
}
175+
176+
@Override
177+
public void cancel() {
178+
179+
}
180+
181+
}
117182
}

utils/src/main/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriber.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Optional;
2222
import java.util.concurrent.CountDownLatch;
2323
import java.util.concurrent.Phaser;
24+
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.concurrent.atomic.AtomicLong;
2526
import org.reactivestreams.Subscriber;
2627
import org.reactivestreams.Subscription;
@@ -56,13 +57,19 @@ public class ByteBufferStoringSubscriber implements Subscriber<ByteBuffer> {
5657

5758
private final Phaser phaser = new Phaser(1);
5859

60+
private final AtomicInteger outstandingDemand = new AtomicInteger(0);
61+
62+
private volatile long byteBufferSizeHint = 0L;
63+
5964
/**
6065
* The active subscription. Set when {@link #onSubscribe(Subscription)} is invoked.
6166
*/
6267
private Subscription subscription;
6368

6469
/**
65-
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval.
70+
* Create a subscriber that stores at least {@code minimumBytesBuffered} in memory for retrieval. The subscriber will
71+
* only request more from the subscription when fewer bytes are buffered AND in flight requests from the subscription will
72+
* likely be under minimumBytesBuffered.
6673
*/
6774
public ByteBufferStoringSubscriber(long minimumBytesBuffered) {
6875
this.minimumBytesBuffered = Validate.isPositive(minimumBytesBuffered, "Data buffer minimum must be positive");
@@ -174,13 +181,19 @@ private int transfer(ByteBuffer in, ByteBuffer out) {
174181
public void onSubscribe(Subscription s) {
175182
storingSubscriber.onSubscribe(new DemandIgnoringSubscription(s));
176183
subscription = s;
184+
outstandingDemand.incrementAndGet();
177185
subscription.request(1);
178186
subscriptionLatch.countDown();
179187
}
180188

181189
@Override
182190
public void onNext(ByteBuffer byteBuffer) {
183191
int remaining = byteBuffer.remaining();
192+
outstandingDemand.decrementAndGet();
193+
// atomic update not required here, in a race it does not matter which thread sets this value since it is not being
194+
// incremented, just set.
195+
byteBufferSizeHint = byteBuffer.remaining();
196+
184197
storingSubscriber.onNext(byteBuffer.duplicate());
185198
addBufferedDataAmount(remaining);
186199
phaser.arrive();
@@ -204,7 +217,9 @@ private void addBufferedDataAmount(long amountToAdd) {
204217
}
205218

206219
private void maybeRequestMore(long currentDataBuffered) {
207-
if (currentDataBuffered < minimumBytesBuffered) {
220+
long dataBufferedAndInFlight = currentDataBuffered + (byteBufferSizeHint * outstandingDemand.get());
221+
if (dataBufferedAndInFlight < minimumBytesBuffered) {
222+
outstandingDemand.incrementAndGet();
208223
subscription.request(1);
209224
}
210225
}

utils/src/test/java/software/amazon/awssdk/utils/async/ByteBufferStoringSubscriberTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ public void doesNotRequestMoreThanMaxBytes() {
7676
verifyNoMoreInteractions(subscription);
7777
}
7878

79+
@Test
80+
public void doesNotRequestMoreWhenInflightMoreThanMinBytes() {
81+
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(5);
82+
83+
subscriber.onSubscribe(subscription); // request 1, demand = 1
84+
subscriber.onNext(fullByteBufferOfSize(3)); // demand = 0, sizeHint=3
85+
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 1
86+
subscriber.transferTo(emptyByteBufferOfSize(1)); // requests more, demand = 2
87+
verify(subscription, times(3)).request(1);
88+
89+
//sizeHint=3, demand=2, dataBufferedAndInFlight=6. 6 > 5, so no new request
90+
subscriber.transferTo(emptyByteBufferOfSize(1));
91+
verifyNoMoreInteractions(subscription);
92+
}
93+
7994
@Test
8095
public void canStoreMoreThanMaxBytesButWontAskForMoreUntilBelowMax() {
8196
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(3);

0 commit comments

Comments
 (0)