blob: e9e781d1737f87ded8b769edef1183716dba555c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import nl.altindag.log.LogCaptor;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.AbstractWarningVerificationStrategy;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class WebSocketClientBehaviorIntegrateTest {
private static final Logger logger = LoggerFactory.getLogger(WebSocketClientBehaviorIntegrateTest.class);
@Rule
public TestName name = new TestName();
private static LogCaptor logCaptor;
private SimpleSocketServer server;
@BeforeClass
public static void setupLogCaptor() {
logCaptor = LogCaptor.forRoot();
}
@AfterClass
public static void tearDown() {
logCaptor.close();
}
@Before
public void setUp() throws InterruptedException {
logCaptor.clearLogs();
server = new SimpleSocketServer();
if (name.getMethodName().equals("shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout") ||
name.getMethodName().equals("shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout")) {
server.start(new TestChannelizers.TestWSNoOpInitializer());
} else if (name.getMethodName().equals("shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections") ||
name.getMethodName().equals("shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded")) {
server.start(new TestChannelizers.TestConnectionThrottlingInitializer());
} else {
server.start(new TestWSGremlinInitializer());
}
}
@After
public void shutdown() {
server.stop();
}
/**
* Tests that client is correctly sending user agent during web socket handshake by having the server return
* the captured user agent.
*/
@Test
public void shouldIncludeUserAgentInHandshakeRequest() {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
// trigger the testing server to return captured user agent
String returnedUserAgent = client.submit("1", RequestOptions.build()
.overrideRequestId(TestWSGremlinInitializer.USER_AGENT_REQUEST_ID).create()).one().getString();
assertEquals(UserAgent.USER_AGENT, returnedUserAgent);
}
/**
* Tests that no user agent is sent to server when that behaviour is disabled.
*/
@Test
public void shouldNotIncludeUserAgentInHandshakeRequestIfDisabled() {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.enableUserAgentOnConnect(false)
.create();
final Client.ClusteredClient client = cluster.connect();
// trigger the testing server to return captured user agent
String returnedUserAgent = client.submit("1", RequestOptions.build()
.overrideRequestId(TestWSGremlinInitializer.USER_AGENT_REQUEST_ID).create()).one().getString();
assertEquals("", returnedUserAgent);
}
/**
* Constructs a deadlock situation when initializing a {@link Client} object in sessionless form that leads to
* hanging behavior in low resource environments (TINKERPOP-2504) and for certain configurations of the
* {@link Cluster} object where there are simply not enough threads to properly allow the {@link Host} and its
* related {@link ConnectionPool} objects to spin up properly - see TINKERPOP-2550.
*/
@Test
public void shouldNotDeadlockOnInitialization() throws Exception {
// it seems you can add the same host more than once so while kinda weird it is helpful in faithfully
// recreating the deadlock situation, though it can/will happen with just one host. workerPoolSize at
// "1" also helps faithfully reproduce the problem though it can happen at larger pool sizes depending
// on the timing/interleaving of tasks. the larger connection pool sizes may not be required given the
// other settings at play but again, just trying to make sure the deadlock state is consistently produced
// and a larger pool size will mean more time to elapse scheduling connection creation tasks which may
// further improve chances of scheduling conflicts that produce the deadlock.
//
// to force this test to a fail state, change ClusteredClient.initializeImplementation() to use the
// standard Cluster.executor rather than the hostExecutor (which is a single threaded independent thread
// pool used just for the purpose of initializing the hosts).
final Cluster cluster = Cluster.build("localhost").
addContactPoint("localhost").
addContactPoint("localhost").port(SimpleSocketServer.PORT).
workerPoolSize(1).
minConnectionPoolSize(32).maxConnectionPoolSize(32).create();
final AtomicBoolean failed = new AtomicBoolean(false);
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
try {
final Client client = cluster.connect();
// test will hang in init() where the Host and ConnectionPool are started up
client.init();
} catch (Exception ex) {
// should not "fail" - just hang and then timeout during the executor shutdown as there is
// a deadlock state, but we have this here just in case. a failed assertion of this value
// below could be interesting
logger.error("Client initialization failed with exception which was unexpected", ex);
failed.set(true);
} finally {
cluster.close();
}
});
executor.shutdown();
// 30 seconds should be ample time, even for travis. the deadlock state happens quite immediately in
// testing and in most situations this test should zip by in subsecond pace
assertThat(executor.awaitTermination(30, TimeUnit.SECONDS), is(true));
assertThat(failed.get(), is(false));
}
/**
* Test a scenario when server closes a connection which does not have any active requests. Such connection
* should be destroyed and replaced by another connection on next request.
*/
@Test
public void shouldRemoveConnectionFromPoolWhenServerClose_WithNoPendingRequests() throws InterruptedException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
// Initialize the client preemptively
client.init();
// assert number of connections opened
final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
assertEquals(1, channelPool.getConnectionIDs().size());
final String originalConnectionID = channelPool.getConnectionIDs().iterator().next();
logger.info("On client init ConnectionIDs: " + channelPool.getConnectionIDs());
// trigger the testing server to send a WS close frame
Vertex v = client.submit("1", RequestOptions.build()
.overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_DELAYED_CLOSE_CONNECTION_REQUEST_ID).create())
.one().getVertex();
assertNotNull(v);
// assert connection is not closed yet
assertEquals(1, channelPool.getConnectionIDs().size());
// wait for server to send the close WS frame
Thread.sleep(6000);
// assert that original connection is not part of the connection pool any more
assertThat("The original connection should have been closed by the server.",
channelPool.getConnectionIDs().contains(originalConnectionID), is(false));
// assert sanity after connection replacement
v = client.submit("1",
RequestOptions.build().overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_REQUEST_ID).create())
.one().getVertex();
assertNotNull(v);
}
/**
* Tests a scenario when the connection a faulty connection replaced by a new connection.
* Ensures that the creation of a new replacement channel only happens once.
*/
@Test
public void shouldRemoveConnectionFromPoolWhenServerClose_WithPendingRequests() throws InterruptedException, ExecutionException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
// Initialize the client preemptively
client.init();
// assert number of connections opened
final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
assertEquals(1, channelPool.getConnectionIDs().size());
// Send two requests in flight. Both should error out.
final CompletableFuture<ResultSet> req1 = client.submitAsync("1", RequestOptions.build()
.overrideRequestId(TestWSGremlinInitializer.CLOSE_CONNECTION_REQUEST_ID).create());
final CompletableFuture<ResultSet> req2 = client.submitAsync("1", RequestOptions.build()
.overrideRequestId(TestWSGremlinInitializer.CLOSE_CONNECTION_REQUEST_ID_2).create());
// assert both are sent on same connection
assertEquals(1, channelPool.getConnectionIDs().size());
// trigger write for both requests
req1.get();
req2.get();
// wait for close message to arrive from server
Thread.sleep(2000);
// Assert that we should consider creating a connection only once, since only one connection is being closed.
assertEquals(1, logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count());
// assert sanity after connection replacement
final Vertex v = client.submit("1",
RequestOptions.build().overrideRequestId(TestWSGremlinInitializer.SINGLE_VERTEX_REQUEST_ID).create())
.one().getVertex();
assertNotNull(v);
}
/**
* Tests the scenario when client intentionally closes the connection. In this case, the
* connection should not be recycled.
*/
@Test
public void shouldNotCreateReplacementConnectionWhenClientClosesConnection() throws ExecutionException, InterruptedException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
// Initialize the client preemptively
client.init();
// Clearing logCaptor before attempting to close the connection is in response to an issue where this test can
// be polluted by logs from a previous test when running on slow hardware.
logCaptor.clearLogs();
// assert number of connections opened
final ConnectionPool channelPool = client.hostConnectionPools.values().stream().findFirst().get();
assertEquals(1, channelPool.getConnectionIDs().size());
// close the connection pool in an authentic manner
channelPool.closeAsync().get();
// wait for channel closure callback to trigger
Thread.sleep(2000);
assertEquals("OnClose callback should be called but only once", 1,
logCaptor.getLogs().stream().filter(str -> str.contains("OnChannelClose callback called for channel")).count());
assertEquals("No new connection creation should be started", 0,
logCaptor.getLogs().stream().filter(str -> str.contains("Considering new connection on")).count());
}
/**
* (TINKERPOP-2814) Tests to make sure that the SSL handshake is now capped by connectionSetupTimeoutMillis and not
* the default Netty SSL handshake timeout of 10,000ms.
*/
@Test
public void shouldAttemptHandshakeForLongerThanDefaultNettySslHandshakeTimeout() {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.connectionSetupTimeoutMillis(20000) // needs to be larger than 10000ms.
.enableSsl(true)
.create();
final Client.ClusteredClient client = cluster.connect();
final long start = System.currentTimeMillis();
Exception caught = null;
try {
client.submit("1");
} catch (Exception e) {
caught = e;
} finally {
// Test against 15000ms which should give a big enough buffer to avoid timing issues.
assertTrue(System.currentTimeMillis() - start > 15000);
assertTrue(caught != null);
assertTrue(caught instanceof NoHostAvailableException);
assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("SSL handshake not completed")));
}
cluster.close();
}
/**
* Tests to make sure that the correct error message is logged when a non-SSL connection attempt times out.
*/
@Test
public void shouldPrintCorrectErrorForRegularWebSocketHandshakeTimeout() throws InterruptedException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(1)
.connectionSetupTimeoutMillis(120)
.create();
final Client.ClusteredClient client = cluster.connect();
Exception caught = null;
try {
client.submit("1");
} catch (Exception e) {
caught = e;
} finally {
assertTrue(caught != null);
assertTrue(caught instanceof NoHostAvailableException);
Thread.sleep(150);
assertTrue(logCaptor.getLogs().stream().anyMatch(str -> str.contains("WebSocket handshake not completed")));
}
cluster.close();
}
/**
* Tests that if a server throttles new connections (doesn't allow new connections to be made) then all requests
* will run and complete on the connections that are already open.
*/
@Test
public void shouldContinueRunningRemainingConnectionsIfServerThrottlesNewConnections() throws ExecutionException, InterruptedException, TimeoutException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(5)
.maxWaitForConnection(15000) // large value ensures that request will eventually find a connection.
.connectionSetupTimeoutMillis(1000)
.minInProcessPerConnection(0)
.maxInProcessPerConnection(1)
.minSimultaneousUsagePerConnection(0)
.maxSimultaneousUsagePerConnection(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
final List<CompletableFuture<ResultSet>> results = new ArrayList<CompletableFuture<ResultSet>>();
for (int i = 0; i < 5; i++) {
results.add(client.submitAsync("500"));
}
for (CompletableFuture<ResultSet> result : results) {
assertNotNull(result.get(60000, TimeUnit.MILLISECONDS).one().getVertex());
}
cluster.close();
}
/**
* Tests that if a server throttles new connections (doesn't allow new connections to be made) then any request
* that can't find a connection within its maxWaitForConnection will return an informative exception regarding
* the inability to open new connections.
*/
@Test
public void shouldReturnCorrectExceptionIfServerThrottlesNewConnectionsAndMaxWaitExceeded() {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(5)
.maxWaitForConnection(250) // small value ensures that requests will return TimeoutException.
.connectionSetupTimeoutMillis(100)
.minInProcessPerConnection(0)
.maxInProcessPerConnection(1)
.minSimultaneousUsagePerConnection(0)
.maxSimultaneousUsagePerConnection(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
for (int i = 0; i < 5; i++) {
try {
client.submitAsync("3000");
} catch (Exception e) {
final Throwable rootCause = ExceptionHelper.getRootCause(e);
assertTrue(rootCause instanceof TimeoutException);
assertTrue(rootCause.getMessage().contains("WebSocket handshake not completed"));
}
}
cluster.close();
}
/**
* Tests that the client continues to work if the server temporarily goes down between two requests.
*/
@Test
public void shouldContinueRunningIfServerGoesDownTemporarily() throws InterruptedException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
final Object lock = new Object();
final ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(1);
scheduledPool.schedule(() -> {
try {
server.stopSync();
server = new SimpleSocketServer();
server.start(new TestWSGremlinInitializer());
synchronized (lock) {
lock.notify();
}
} catch (InterruptedException ignored) {
// Ignored.
}
}, 1000, TimeUnit.MILLISECONDS);
synchronized (lock) {
assertNotNull(client.submit("1").one().getVertex());
lock.wait(30000);
}
assertNotNull(client.submit("1").one().getVertex());
cluster.close();
}
/**
* Tests that if the host is unavailable then the client will return an exception that contains information about
* the status of the host.
*/
@Test
public void shouldReturnCorrectExceptionIfServerGoesDown() throws InterruptedException {
final Cluster cluster = Cluster.build("localhost").port(SimpleSocketServer.PORT)
.minConnectionPoolSize(1)
.maxWaitForConnection(500)
.connectionSetupTimeoutMillis(100)
.serializer(Serializers.GRAPHSON_V2D0)
.create();
final Client.ClusteredClient client = cluster.connect();
client.submit("1");
server.stopSync();
try {
client.submit("1");
} catch (Exception e) {
final Throwable rootCause = ExceptionHelper.getRootCause(e);
assertTrue(rootCause instanceof TimeoutException);
assertTrue(rootCause.getMessage().contains("Connection refused"));
}
cluster.close();
}
}