[CALCITE-4196] Consume all data from client before replying with HTTP/401
SPNEGO's handshake involves sending an HTTP/401 to "challenge" the
client to reply with authentication data. If the client is sending
a significant amount of data in the original request, the client
will still be writing this data when the server replies. This causes
the client to receive a TCP Reset when it continues to write data, and
ultimately manifests in a "Broken Pipe" runtime exception.
The fix is to simply consume all data the client wrote prior to
responding with the HTTP/401.
Closes #127
Signed-off-by: Kevin Risden <krisden@apache.org>
diff --git a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
index 45e8236..18b22dd 100644
--- a/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
+++ b/core/src/main/java/org/apache/calcite/avatica/AvaticaUtils.java
@@ -57,6 +57,8 @@
}
};
+ private static final int SKIP_BUFFER_SIZE = 4096;
+
private AvaticaUtils() {}
static {
@@ -286,6 +288,26 @@
return buffer.toArray();
}
+ /**
+ * Reads and discards all data available on the InputStream.
+ */
+ public static void skipFully(InputStream inputStream) throws IOException {
+ byte[] temp = null;
+ while (true) {
+ long bytesSkipped = inputStream.skip(Long.MAX_VALUE);
+ if (bytesSkipped == 0) {
+ if (temp == null) {
+ temp = new byte[SKIP_BUFFER_SIZE];
+ }
+ int bytesRead = inputStream.read(temp, 0, SKIP_BUFFER_SIZE);
+ if (bytesRead < 0) {
+ // EOF
+ return;
+ }
+ }
+ }
+ }
+
/** Invokes {@code Statement#setLargeMaxRows}, falling back on
* {@link Statement#setMaxRows(int)} if the method does not exist (before
* JDK 1.8) or throws {@link UnsupportedOperationException}. */
diff --git a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
index a1561aa..75adbab 100644
--- a/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
+++ b/core/src/test/java/org/apache/calcite/avatica/test/AvaticaUtilsTest.java
@@ -23,8 +23,12 @@
import org.junit.Test;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -32,9 +36,13 @@
import java.util.Properties;
import java.util.Set;
+import static org.apache.calcite.avatica.AvaticaUtils.skipFully;
+
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
@@ -270,6 +278,31 @@
assertThat(s, is(s2));
}
+ @Test public void testSkipFully() throws IOException {
+ InputStream in = of("");
+ assertEquals(0, in.available());
+ skipFully(in);
+ assertEquals(0, in.available());
+
+ in = of("asdf");
+ assertEquals(4, in.available());
+ skipFully(in);
+ assertEquals(0, in.available());
+
+ in = of("asdfasdf");
+ for (int i = 0; i < 4; i++) {
+ assertNotEquals(-1, in.read());
+ }
+ assertEquals(4, in.available());
+ skipFully(in);
+ assertEquals(0, in.available());
+ }
+
+ /** Returns an InputStream of UTF-8 encoded bytes from the provided string */
+ InputStream of(String str) {
+ return new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8));
+ }
+
/** Dummy implementation of {@link ConnectionProperty}. */
private static class ConnectionPropertyImpl implements ConnectionProperty {
private final String name;
diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
index 687c847..fa5372c 100644
--- a/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
+++ b/server/src/main/java/org/apache/calcite/avatica/server/AbstractAvaticaHandler.java
@@ -17,6 +17,7 @@
package org.apache.calcite.avatica.server;
import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.remote.AuthenticationType;
import org.apache.calcite.avatica.remote.Service.ErrorResponse;
@@ -26,6 +27,7 @@
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.Collections;
+import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -57,8 +59,14 @@
// Make sure that we drop any unauthenticated users out first.
if (null != serverConfig) {
if (AuthenticationType.SPNEGO == serverConfig.getAuthenticationType()) {
+ // This is largely a failsafe. We should never normally get here, but
+ // AvaticaSpnegoAuthenticator should throw the HTTP/401.
String remoteUser = request.getRemoteUser();
if (null == remoteUser) {
+ ServletInputStream input = request.getInputStream();
+ if (request.getContentLengthLong() < 0) {
+ AvaticaUtils.skipFully(input);
+ }
response.setStatus(HttpURLConnection.HTTP_UNAUTHORIZED);
response.getOutputStream().write(UNAUTHORIZED_ERROR.serialize().toByteArray());
baseRequest.setHandled(true);
diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
index 1bf6af9..16e27ff 100644
--- a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
+++ b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java
@@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Callable;
import javax.servlet.ServletException;
@@ -46,7 +47,7 @@
* Jetty handler that executes Avatica JSON request-responses.
*/
public class AvaticaProtobufHandler extends AbstractAvaticaHandler {
- private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AvaticaProtobufHandler.class);
private final Service service;
private final ProtobufHandler pbHandler;
@@ -90,6 +91,14 @@
HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
try (final Context ctx = this.requestTimer.start()) {
+ if (!request.getMethod().equals("POST")) {
+ response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+ response.getOutputStream().write(
+ "This server expects only POST calls.".getBytes(StandardCharsets.UTF_8));
+ baseRequest.setHandled(true);
+ return;
+ }
+
// Check if the user is OK to proceed.
if (!isUserPermitted(serverConfig, baseRequest, request, response)) {
LOG.debug("HTTP request from {} is unauthenticated and authentication is required",
@@ -97,50 +106,48 @@
return;
}
+ final byte[] requestBytes;
+ // Avoid a new buffer creation for every HTTP request
+ final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
+ try (ServletInputStream inputStream = request.getInputStream()) {
+ requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
+ } finally {
+ buffer.reset();
+ }
+
response.setContentType("application/octet-stream;charset=utf-8");
response.setStatus(HttpServletResponse.SC_OK);
- if (request.getMethod().equals("POST")) {
- final byte[] requestBytes;
- // Avoid a new buffer creation for every HTTP request
- final UnsynchronizedBuffer buffer = threadLocalBuffer.get();
- try (ServletInputStream inputStream = request.getInputStream()) {
- requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer);
- } finally {
- buffer.reset();
+ HandlerResponse<byte[]> handlerResponse;
+ try {
+ if (null != serverConfig && serverConfig.supportsImpersonation()) {
+ // If we can't extract a user, need to throw 401 in that case.
+ String remoteUser = serverConfig.getRemoteUserExtractor().extract(request);
+ // Invoke the ProtobufHandler inside as doAs for the remote user.
+ // The doAsRemoteUser call may disallow a user, need to throw 403 in that case.
+ handlerResponse = serverConfig.doAsRemoteUser(remoteUser,
+ request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() {
+ @Override public HandlerResponse<byte[]> call() {
+ return pbHandler.apply(requestBytes);
+ }
+ });
+ } else {
+ handlerResponse = pbHandler.apply(requestBytes);
}
-
- HandlerResponse<byte[]> handlerResponse;
- try {
- if (null != serverConfig && serverConfig.supportsImpersonation()) {
- // If we can't extract a user, need to throw 401 in that case.
- String remoteUser = serverConfig.getRemoteUserExtractor().extract(request);
- // Invoke the ProtobufHandler inside as doAs for the remote user.
- // The doAsRemoteUser call may disallow a user, need to throw 403 in that case.
- handlerResponse = serverConfig.doAsRemoteUser(remoteUser,
- request.getRemoteAddr(), new Callable<HandlerResponse<byte[]>>() {
- @Override public HandlerResponse<byte[]> call() {
- return pbHandler.apply(requestBytes);
- }
- });
- } else {
- handlerResponse = pbHandler.apply(requestBytes);
- }
- } catch (RemoteUserExtractionException e) {
- LOG.debug("Failed to extract remote user from request", e);
- handlerResponse = pbHandler.unauthenticatedErrorResponse(e);
- } catch (RemoteUserDisallowedException e) {
- LOG.debug("Remote user is not authorized", e);
- handlerResponse = pbHandler.unauthorizedErrorResponse(e);
- } catch (Exception e) {
- LOG.debug("Error invoking request from {}", baseRequest.getRemoteAddr(), e);
- // Catch at the highest level of exceptions
- handlerResponse = pbHandler.convertToErrorResponse(e);
- }
-
- baseRequest.setHandled(true);
- response.setStatus(handlerResponse.getStatusCode());
- response.getOutputStream().write(handlerResponse.getResponse());
+ } catch (RemoteUserExtractionException e) {
+ LOG.debug("Failed to extract remote user from request", e);
+ handlerResponse = pbHandler.unauthenticatedErrorResponse(e);
+ } catch (RemoteUserDisallowedException e) {
+ LOG.debug("Remote user is not authorized", e);
+ handlerResponse = pbHandler.unauthorizedErrorResponse(e);
+ } catch (Exception e) {
+ LOG.debug("Error invoking request from {}", baseRequest.getRemoteAddr(), e);
+ // Catch at the highest level of exceptions
+ handlerResponse = pbHandler.convertToErrorResponse(e);
}
+
+ baseRequest.setHandled(true);
+ response.setStatus(handlerResponse.getStatusCode());
+ response.getOutputStream().write(handlerResponse.getResponse());
}
}
diff --git a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
index d801d4f..fcd9206 100644
--- a/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
+++ b/server/src/main/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticator.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.avatica.server;
+import org.apache.calcite.avatica.AvaticaUtils;
+
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.security.ServerAuthException;
import org.eclipse.jetty.security.authentication.DeferredAuthentication;
@@ -73,6 +75,14 @@
res.sendError(HttpServletResponse.SC_UNAUTHORIZED);
return Authentication.SEND_CONTINUE;
}
+ } else if (computedAuth == Authentication.SEND_CONTINUE) {
+ // CALCITE-4196 When we need to reply back to the client with the HTTP/401 challenge
+ // we must make sure that we consume all of the data that the client has written. Otherwise,
+ // the client will continue to write the data on a socket which we've already closed. This
+ // would ultimately result in the client receiving a TCP Reset and seeing a "Broken pipe"
+ // exception in their client application.
+ HttpServletRequest req = (HttpServletRequest) request;
+ AvaticaUtils.skipFully(req.getInputStream());
}
return computedAuth;
}
diff --git a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
index c33c6c2..0d1972f 100644
--- a/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/AvaticaSpnegoTest.java
@@ -251,7 +251,6 @@
assertEquals(3, results.getInt(1));
}
}
-
}
// End AvaticaSpnegoTest.java
diff --git a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
index 93c634e..f8e79ae 100644
--- a/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/server/AbstractAvaticaHandlerTest.java
@@ -26,12 +26,15 @@
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
+import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -59,9 +62,13 @@
@Test public void disallowUnauthenticatedUsers() throws Exception {
ServletOutputStream os = mock(ServletOutputStream.class);
+ ServletInputStream is = mock(ServletInputStream.class);
+
+ when(is.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
when(config.getAuthenticationType()).thenReturn(AuthenticationType.SPNEGO);
when(request.getRemoteUser()).thenReturn(null);
+ when(request.getInputStream()).thenReturn(is);
when(response.getOutputStream()).thenReturn(os);
assertFalse(handler.isUserPermitted(config, baseRequest, request, response));
diff --git a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
index acf814b..3005c68 100644
--- a/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
+++ b/server/src/test/java/org/apache/calcite/avatica/server/AvaticaSpnegoAuthenticatorTest.java
@@ -24,10 +24,14 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@@ -40,17 +44,21 @@
private HttpServletRequest request;
private HttpServletResponse response;
+ private ServletInputStream requestInput;
private AvaticaSpnegoAuthenticator authenticator;
- @Before public void setup() {
+ @Before public void setup() throws IOException {
request = mock(HttpServletRequest.class);
+ requestInput = mock(ServletInputStream.class);
+ when(request.getInputStream()).thenReturn(requestInput);
response = mock(HttpServletResponse.class);
authenticator = new AvaticaSpnegoAuthenticator();
}
@Test public void testAuthenticatedDoesNothingExtra() throws IOException {
+ // SEND_CONTINUE not listed here for explicit testing below.
List<Authentication> authsNotRequiringUpdate = Arrays.asList(Authentication.NOT_CHECKED,
- Authentication.SEND_CONTINUE, Authentication.SEND_FAILURE, Authentication.SEND_SUCCESS);
+ Authentication.SEND_FAILURE, Authentication.SEND_SUCCESS);
for (Authentication auth : authsNotRequiringUpdate) {
assertEquals(auth, authenticator.sendChallengeIfNecessary(auth, request, response));
verifyZeroInteractions(request);
@@ -67,6 +75,15 @@
HttpHeader.NEGOTIATE.asString());
verify(response).sendError(HttpServletResponse.SC_UNAUTHORIZED);
}
+
+ @Test public void testConsumeClientBufferOnChallenge() throws IOException {
+ when(requestInput.read(any(byte[].class), anyInt(), anyInt())).thenReturn(-1);
+ assertEquals(Authentication.SEND_CONTINUE,
+ authenticator.sendChallengeIfNecessary(Authentication.SEND_CONTINUE, request, response));
+ verify(request).getInputStream();
+ verify(requestInput).skip(anyLong());
+ verify(requestInput).read(any(byte[].class), anyInt(), anyInt());
+ }
}
// End AvaticaSpnegoAuthenticatorTest.java