Add streaming backpressure via bounded queue in gremlin-driver Bound the ByteBufQueueInputStream queue and use Netty's setAutoRead(false) to apply TCP backpressure when the client cannot keep up with the server's streaming rate. When the queue is full, the handler pauses channel reads and blocks briefly to enqueue the current chunk. The consumer resumes reads when the queue drains below half capacity via a callback dispatched to the event loop. The buffer size defaults to 256 chunks and is configurable via Cluster.Builder.streamBufferSize().
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java index 59b9d02..4d23aa3 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Channelizer.java
@@ -212,7 +212,7 @@ final GraphBinaryReader graphBinaryReader = ((GraphBinaryMessageSerializerV4) serializer).getMapper().getReader(); streamingResponseHandler = new HttpStreamingResponseHandler( - graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength()); + graphBinaryReader, pending, cluster.streamingReaderPool(), cluster.getMaxResponseContentLength(), cluster.getStreamBufferSize()); } else { useStreaming = false; gremlinResponseDecoder = new HttpGremlinResponseDecoder(serializer);
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java index d4a3967..40644df 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java
@@ -173,6 +173,7 @@ .reconnectInterval(settings.connectionPool.reconnectInterval) .resultIterationBatchSize(settings.connectionPool.resultIterationBatchSize) .maxResponseContentLength(settings.connectionPool.maxResponseContentLength) + .streamBufferSize(settings.connectionPool.streamBufferSize) .maxWaitForConnection(settings.connectionPool.maxWaitForConnection) .maxConnectionPoolSize(settings.connectionPool.maxSize) .connectionSetupTimeoutMillis(settings.connectionPool.connectionSetupTimeoutMillis) @@ -327,6 +328,13 @@ } /** + * Gets the size of the buffer used for streaming responses. + */ + public int getStreamBufferSize() { + return manager.connectionPoolSettings.streamBufferSize; + } + + /** * Get time in milliseconds that the driver will allow a channel to not receive read or writes before it automatically closes. */ public long getIdleConnectionTimeout() { @@ -495,6 +503,7 @@ private int maxWaitForConnection = Connection.MAX_WAIT_FOR_CONNECTION; private int maxWaitForClose = Connection.MAX_WAIT_FOR_CLOSE; private long maxResponseContentLength = Connection.MAX_RESPONSE_CONTENT_LENGTH; + private int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE; private int reconnectInterval = Connection.RECONNECT_INTERVAL; private int resultIterationBatchSize = Connection.RESULT_ITERATION_BATCH_SIZE; private boolean enableSsl = false; @@ -724,6 +733,15 @@ } /** + * The size of the buffer used for streaming responses. + */ + public Builder streamBufferSize(final int streamBufferSize) { + if (streamBufferSize < 1) throw new IllegalArgumentException("streamBufferSize must be at least 1"); + this.streamBufferSize = streamBufferSize; + return this; + } + + /** * Specify a valid Gremlin script that can be used to test remote operations. This script should be designed * to return quickly with the least amount of overhead possible. By default, the script sends an empty string. * If the graph does not support that sort of script because it requires all scripts to include a reference @@ -994,6 +1012,7 @@ connectionPoolSettings.maxWaitForConnection = builder.maxWaitForConnection; connectionPoolSettings.maxWaitForClose = builder.maxWaitForClose; connectionPoolSettings.maxResponseContentLength = builder.maxResponseContentLength; + connectionPoolSettings.streamBufferSize = builder.streamBufferSize; connectionPoolSettings.reconnectInterval = builder.reconnectInterval; connectionPoolSettings.resultIterationBatchSize = builder.resultIterationBatchSize; connectionPoolSettings.enableSsl = builder.enableSsl;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java index 12a1de6..8228c5b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java
@@ -47,6 +47,7 @@ public static final int MAX_WAIT_FOR_CONNECTION = 16000; public static final int MAX_WAIT_FOR_CLOSE = 3000; public static final long MAX_RESPONSE_CONTENT_LENGTH = Integer.MAX_VALUE; + public static final int DEFAULT_STREAM_BUFFER_SIZE = 256; public static final int RECONNECT_INTERVAL = 1000; public static final int RESULT_ITERATION_BATCH_SIZE = 64; public static final long CONNECTION_SETUP_TIMEOUT_MILLIS = 15000;
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java index 907909c..b204f81 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java
@@ -202,6 +202,9 @@ if (connectionPoolConf.containsKey("maxResponseContentLength")) cpSettings.maxResponseContentLength = connectionPoolConf.getInt("maxResponseContentLength"); + if (connectionPoolConf.containsKey("streamBufferSize")) + cpSettings.streamBufferSize = connectionPoolConf.getInt("streamBufferSize"); + if (connectionPoolConf.containsKey("reconnectInterval")) cpSettings.reconnectInterval = connectionPoolConf.getInt("reconnectInterval"); @@ -325,6 +328,11 @@ public long maxResponseContentLength = Connection.MAX_RESPONSE_CONTENT_LENGTH; /** + * The size of the buffer used for streaming responses. The default value is 256. + */ + public int streamBufferSize = Connection.DEFAULT_STREAM_BUFFER_SIZE; + + /** * The amount of time in milliseconds to wait before trying to reconnect to a dead host. The default value is * 1000. */
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java index 607fd6b..c2968b7 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandler.java
@@ -67,6 +67,7 @@ private final AtomicReference<ResultSet> pendingResultSet; private final ExecutorService readerPool; private final long maxResponseContentLength; + private final int streamBufferSize; // Mutable state below is accessed exclusively from the channel's event loop thread. private HttpResponseStatus responseStatus; @@ -78,11 +79,13 @@ public HttpStreamingResponseHandler(final GraphBinaryReader graphBinaryReader, final AtomicReference<ResultSet> pendingResultSet, final ExecutorService readerPool, - final long maxResponseContentLength) { + final long maxResponseContentLength, + final int streamBufferSize) { this.graphBinaryReader = graphBinaryReader; this.pendingResultSet = pendingResultSet; this.readerPool = readerPool; this.maxResponseContentLength = maxResponseContentLength; + this.streamBufferSize = streamBufferSize; } @Override @@ -97,7 +100,9 @@ responseStatus = resp.status(); contentType = resp.headers().get(HttpHeaderNames.CONTENT_TYPE); - queueInputStream = new ByteBufQueueInputStream(); + final io.netty.channel.Channel channel = ctx.channel(); + queueInputStream = new ByteBufQueueInputStream(streamBufferSize, () -> + channel.eventLoop().execute(() -> channel.config().setAutoRead(true))); // Spawn reader thread for GraphBinary responses if (isGraphBinaryResponse()) { @@ -147,7 +152,18 @@ } else if (content.readableBytes() > 0 && queueInputStream != null) { // Feed bytes to the reader thread // retain() because Netty releases the content ByteBuf after decode() returns - queueInputStream.offer(content.retain()); + final ByteBuf buf = content.retain(); + if (!queueInputStream.offer(buf)) { + ctx.channel().config().setAutoRead(false); + queueInputStream.markPaused(); + try { + queueInputStream.putBlocking(buf); + } catch (InterruptedException e) { + buf.release(); + ctx.channel().config().setAutoRead(true); + Thread.currentThread().interrupt(); + } + } } if (msg instanceof LastHttpContent) {
diff --git a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java index 757b282..a613c7b 100644 --- a/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java +++ b/gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/stream/ByteBufQueueInputStream.java
@@ -37,33 +37,70 @@ private static final ByteBuf END_OF_STREAM = Unpooled.buffer(0); private final BlockingQueue<ByteBuf> queue; + private final int capacity; + private final Runnable onSpaceAvailable; private ByteBuf current; private volatile boolean eof; + private volatile boolean readsPaused; public ByteBufQueueInputStream() { - this.queue = new LinkedBlockingQueue<>(); + this(Integer.MAX_VALUE, () -> {}); + } + + public ByteBufQueueInputStream(final int capacity, final Runnable onSpaceAvailable) { + this.queue = new LinkedBlockingQueue<>(capacity); + this.capacity = capacity; + this.onSpaceAvailable = onSpaceAvailable; } /** * Offer a ByteBuf to the queue. The caller must have already retained the ByteBuf if needed. * The ByteBuf will be released after it is fully read. If the stream is already closed, * the buffer is released immediately. + * + * @return true if the buffer was accepted, false if the queue is full. */ - public void offer(final ByteBuf buf) { + public boolean offer(final ByteBuf buf) { + if (eof) { + if (buf != END_OF_STREAM && buf.refCnt() > 0) { + buf.release(); + } + return true; + } + return queue.offer(buf); + } + + /** + * Blocking put for when the queue is full. The caller should pause reads before calling this + * to avoid blocking the event loop indefinitely. + */ + public void putBlocking(final ByteBuf buf) throws InterruptedException { if (eof) { if (buf != END_OF_STREAM && buf.refCnt() > 0) { buf.release(); } return; } - queue.add(buf); + queue.put(buf); + } + + /** + * Mark that the producer has paused reads due to backpressure. + */ + public void markPaused() { + readsPaused = true; } /** * Signal that no more ByteBufs will be offered. */ public void signalEndOfStream() { - queue.offer(END_OF_STREAM); + try { + queue.put(END_OF_STREAM); + } catch (InterruptedException e) { + eof = true; + Thread.currentThread().interrupt(); + } } @Override @@ -78,12 +115,15 @@ Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } - if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == null) { + throw new IOException("Timed out waiting for streaming response data"); + } if (current == END_OF_STREAM) { eof = true; current = null; return -1; } + checkBackpressure(); } return current.readByte() & 0xFF; } @@ -102,12 +142,15 @@ Thread.currentThread().interrupt(); throw new IOException("Interrupted while waiting for data", e); } - if (current == null) throw new IOException("Timed out waiting for streaming response data"); + if (current == null) { + throw new IOException("Timed out waiting for streaming response data"); + } if (current == END_OF_STREAM) { eof = true; current = null; return -1; } + checkBackpressure(); } final int readable = Math.min(current.readableBytes(), len); current.readBytes(b, off, readable); @@ -127,6 +170,13 @@ } } + private void checkBackpressure() { + if (readsPaused && queue.size() < Math.max(1, capacity / 2)) { + readsPaused = false; + onSpaceAvailable.run(); + } + } + private void releaseCurrent() { if (current != null && current != END_OF_STREAM && current.refCnt() > 0) { current.release();
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java index 91ba9cf..9c0378b 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/ByteBufQueueInputStreamTest.java
@@ -24,6 +24,8 @@ import org.apache.tinkerpop.gremlin.driver.stream.ByteBufQueueInputStream; import org.junit.Test; +import java.util.concurrent.atomic.AtomicBoolean; + import static org.junit.Assert.*; public class ByteBufQueueInputStreamTest { @@ -78,6 +80,34 @@ } @Test + public void shouldReturnFalseWhenQueueFull() throws Exception { + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, () -> {}); + assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{1}))); + assertTrue(stream.offer(Unpooled.wrappedBuffer(new byte[]{2}))); + assertFalse(stream.offer(Unpooled.wrappedBuffer(new byte[]{3}))); + stream.close(); + } + + @Test + public void shouldInvokeCallbackWhenDrainedBelowThreshold() throws Exception { + final AtomicBoolean callbackInvoked = new AtomicBoolean(false); + final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(2, () -> callbackInvoked.set(true)); + + stream.offer(Unpooled.wrappedBuffer(new byte[]{1})); + stream.offer(Unpooled.wrappedBuffer(new byte[]{2})); + stream.markPaused(); + + // First read consumes the first buf (queue size goes to 1, not below capacity/2=1 yet) + assertEquals(1, stream.read()); + assertFalse(callbackInvoked.get()); + + // Second read polls the second buf (queue size goes to 0, below threshold) triggering callback + assertEquals(2, stream.read()); + assertTrue(callbackInvoked.get()); + stream.close(); + } + + @Test public void shouldCleanUpOnClose() throws Exception { final ByteBufQueueInputStream stream = new ByteBufQueueInputStream(); final ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(2);
diff --git a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java index fdd3781..a9b61ee 100644 --- a/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java +++ b/gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/handler/HttpStreamingResponseHandlerTest.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.apache.tinkerpop.gremlin.driver.Channelizer.HttpChannelizer.LAST_CONTENT_READ_RESPONSE; @@ -68,8 +69,12 @@ } private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength) { + return createChannel(pendingResultSet, maxResponseContentLength, 256); + } + + private EmbeddedChannel createChannel(final AtomicReference<ResultSet> pendingResultSet, final long maxResponseContentLength, final int streamBufferSize) { final HttpStreamingResponseHandler handler = new HttpStreamingResponseHandler( - reader, pendingResultSet, executor, maxResponseContentLength); + reader, pendingResultSet, executor, maxResponseContentLength, streamBufferSize); return new EmbeddedChannel(handler); } @@ -198,6 +203,7 @@ channel.finishAndReleaseAll(); } + private byte[] toBytes(final io.netty.buffer.ByteBuf buf) { final byte[] bytes = new byte[buf.readableBytes()]; buf.readBytes(bytes);