Merge branch '3.5-dev'
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
index 0cdb104..8c2ee6a 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/Context.java
@@ -174,19 +174,38 @@
* @see #writeAndFlush(ResponseMessage)
*/
public void writeAndFlush(final ResponseStatusCode code, final Object responseMessage) {
+ writeAndMaybeFlush(code, responseMessage, true);
+ }
+
+ public void write(final ResponseMessage message) {
+ this.write(message.getStatus().getCode(), message);
+ }
+
+ public void write(final ResponseStatusCode code, final Object responseMessage) {
+ writeAndMaybeFlush(code, responseMessage, false);
+ }
+
+ /**
+ * Flushes messages to the underlying transport.
+ */
+ public void flush() {
+ this.getChannelHandlerContext().flush();
+ }
+
+ private void writeAndMaybeFlush(final ResponseStatusCode code, final Object responseMessage, final boolean flush) {
final boolean messageIsFinal = code.isFinalResponse();
- if(finalResponseWritten.compareAndSet(false, messageIsFinal)) {
- this.getChannelHandlerContext().writeAndFlush(responseMessage);
+ if (finalResponseWritten.compareAndSet(false, messageIsFinal)) {
+ this.getChannelHandlerContext().write(responseMessage);
+ if (flush) this.getChannelHandlerContext().flush();
} else {
if (responseMessage instanceof Frame) {
((Frame) responseMessage).tryRelease();
}
final String logMessage = String.format("Another final response message was already written for request %s, ignoring response code: %s",
- this.getRequestMessage().getRequestId(), code);
+ this.getRequestMessage().getRequestId(), code);
logger.warn(logMessage);
}
-
}
private RequestContentType determineRequestContents() {
diff --git a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
index 8765d98..afdae15 100644
--- a/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
+++ b/gremlin-server/src/main/java/org/apache/tinkerpop/gremlin/server/handler/MultiTaskSession.java
@@ -187,7 +187,7 @@
// the ResponseMessage as we don't want the message to be "final" for the Context. that
// status must be reserved for the message that caused the error
for (SessionTask st : queue) {
- st.writeAndFlush(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
+ st.write(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(String.format(
"An earlier request [%s] failed prior to this one having a chance to execute",
@@ -205,7 +205,7 @@
// the best answer
if (!sessionTask.isFinalResponseWritten()) {
logger.warn(rexex.getMessage(), rexex);
- sessionTask.writeAndFlush(rexex.getResponseMessage());
+ sessionTask.write(rexex.getResponseMessage());
}
} finally {
// if this is a normal end to the session or if there is some general processing exception which tanks
@@ -215,6 +215,15 @@
closeReason.get() == CloseReason.PROCESSING_EXCEPTION ||
closeReason.get() == CloseReason.SESSION_TIMEOUT) {
close();
+
+ // the session is now in a state where it is no longer in the set of current sessions so flush
+ // remaining messages to the transport, if any. the remaining message should be failures from
+ // the SessionException in the catch. this prevents an unlikely case where a fast client can
+ // get ahead of the server and start to send messages to a technically errored out session. that
+ // in itself is not necessarily bad but it makes tests fail sometimes because the tests are
+ // designed to assert in the same fashion for OpProcessor and UnifiedChannelizer infrastructure
+ // and they are slightly at odds with each other in certain situations.
+ sessionTask.flush();
}
}
}
diff --git a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
index e685932..c7ff19f 100644
--- a/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
+++ b/gremlin-server/src/test/java/org/apache/tinkerpop/gremlin/server/ContextTest.java
@@ -104,48 +104,57 @@
@Test
public void shouldAllowMultipleNonFinalResponses() {
writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
- Mockito.verify(ctx, Mockito.times(3)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(3)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(3)).flush();
}
@Test
public void shouldAllowAtMostOneFinalResponse() {
writeInvoker.apply(context, ResponseStatusCode.AUTHENTICATE);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.SUCCESS);
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SERVER_ERROR_TIMEOUT + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(2)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(2)).flush();
}
@Test
public void shouldNotAllowNonFinalMessagesAfterFinalResponse() {
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
writeInvoker.apply(context, ResponseStatusCode.PARTIAL_CONTENT);
assertTrue(recordingAppender.logContainsAny(".*" + request.getRequestId() + ".*"));
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.PARTIAL_CONTENT + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
}
@Test
public void shouldReleaseIgnoredFrames() {
writeInvoker.apply(context, ResponseStatusCode.SERVER_ERROR_TIMEOUT);
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
Frame frame = Mockito.mock(Frame.class);
context.writeAndFlush(ResponseStatusCode.SUCCESS, frame);
@@ -154,9 +163,11 @@
assertTrue(recordingAppender.logContainsAny(".*" + ResponseStatusCode.SUCCESS + "$"));
// ensure there were no other writes to the channel
- Mockito.verify(ctx, Mockito.times(1)).writeAndFlush(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).write(Mockito.any());
+ Mockito.verify(ctx, Mockito.times(1)).flush();
// ensure the frame was released
Mockito.verify(frame, Mockito.times(1)).tryRelease();
+ Mockito.verify(ctx, Mockito.times(1)).flush();
}
}
\ No newline at end of file