blob: 3c4c77726c455b3ffd58e64a76a19a276d2ab1c3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.driver;
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import io.netty.handler.codec.CorruptedFrameException;
import nl.altindag.log.LogCaptor;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException;
import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.server.AbstractGremlinServerIntegrationTest;
import org.apache.tinkerpop.gremlin.server.TestClientFactory;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.net.ssl.SSLHandshakeException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.number.OrderingComparison.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
public class ClientConnectionIntegrateTest extends AbstractGremlinServerIntegrationTest {
private static LogCaptor logCaptor;
private Level previousLevel;
@BeforeClass
public static void setupLogCaptor() {
logCaptor = LogCaptor.forClass(Connection.class);
}
@AfterClass
public static void tearDownAfterClass() {
logCaptor.close();
}
@Before
public void setupForEachTest() {
final Logger lc = (Logger) LoggerFactory.getLogger(Connection.class);
previousLevel = lc.getLevel();
lc.setLevel(Level.DEBUG);
logCaptor.clearLogs();
}
@After
public void afterEachTest() {
final Logger lc = (Logger) LoggerFactory.getLogger(Connection.class);
lc.setLevel(previousLevel);
}
/**
* Reproducer for TINKERPOP-2169
*/
@Test
public void shouldCloseConnectionDeadDueToUnRecoverableError() throws Exception {
// Set a low value of maxContentLength to intentionally trigger CorruptedFrameException
final Cluster cluster = TestClientFactory.build()
.serializer(Serializers.GRAPHBINARY_V1D0)
.maxContentLength(64)
.minConnectionPoolSize(1)
.maxConnectionPoolSize(2)
.create();
final Client.ClusteredClient client = cluster.connect();
try {
// Add the test data so that the g.V() response could exceed maxContentLength
client.submit("g.inject(1).repeat(__.addV()).times(20).count()").all().get();
try {
client.submit("g.V().fold()").all().get();
fail("Should throw an exception.");
} catch (Exception re) {
assertThat(re.getCause() instanceof CorruptedFrameException, is(true));
}
// without this wait this test is failing randomly on docker/travis with ConcurrentModificationException
// see TINKERPOP-2504 - adjust the sleep to account for the max time to wait for sessions to close in
// an orderly fashion
Thread.sleep(Connection.MAX_WAIT_FOR_CLOSE + 1000);
// Assert that the host has not been marked unavailable
assertEquals(1, cluster.availableHosts().size());
// Assert that there is no connection leak and all connections have been closed
assertEquals(0, client.hostConnectionPools.values().stream()
.findFirst().get()
.numConnectionsWaitingToCleanup());
} finally {
cluster.close();
}
// Assert that the connection has been destroyed. Specifically check for the string with
// isDead=true indicating the connection that was closed due to CorruptedFrameException.
assertThat(logCaptor.getLogs().stream().anyMatch(m -> m.matches(
"^(?!.*(isDead=false)).*isDead=true.*destroyed successfully.$")), Is.is(true));
}
@Test
public void shouldBalanceConcurrentRequestsAcrossConnections() throws InterruptedException {
final int connPoolSize = 16;
final Cluster cluster = TestClientFactory.build()
.minConnectionPoolSize(connPoolSize)
.maxConnectionPoolSize(connPoolSize)
.create();
final Client.ClusteredClient client = cluster.connect();
client.init();
final ExecutorService executorServiceForTesting = cluster.executor();
try {
final RequestMessage.Builder request = client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
.add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
final Callable<Connection> sendQueryCallable = () -> client.chooseConnection(request.create());
final List<Callable<Connection>> listOfTasks = new ArrayList<>();
for (int i = 0; i < connPoolSize; i++) {
listOfTasks.add(sendQueryCallable);
}
HashMap<String, Integer> channelsSize = new HashMap<>();
final List<Future<Connection>> executorSubmitFutures = executorServiceForTesting.invokeAll(listOfTasks);
executorSubmitFutures.parallelStream().map(fut -> {
try {
return fut.get();
} catch (InterruptedException | ExecutionException e) {
fail(e.getMessage());
return null;
}
}).forEach(conn -> {
String id = conn.getChannelId();
channelsSize.put(id, channelsSize.getOrDefault(id, 0) + 1);
});
assertNotEquals(channelsSize.entrySet().size(), 0);
channelsSize.entrySet().forEach(entry -> {
assertEquals(1, (entry.getValue()).intValue());
});
} finally {
executorServiceForTesting.shutdown();
client.close();
cluster.close();
}
}
@Test
public void overLimitOperationsShouldDelegateToSingleNewConnection() throws InterruptedException {
final int operations = 6;
final int usagePerConnection = 3;
final Cluster cluster = TestClientFactory.build()
.minConnectionPoolSize(1)
.maxConnectionPoolSize(operations)
.minSimultaneousUsagePerConnection(1)
.maxSimultaneousUsagePerConnection(usagePerConnection)
.create();
final Client.ClusteredClient client = cluster.connect();
client.init();
final ExecutorService executorServiceForTesting = cluster.executor();
try {
final RequestMessage.Builder request = client.buildMessage(RequestMessage.build(Tokens.OPS_EVAL))
.add(Tokens.ARGS_GREMLIN, "Thread.sleep(5000)");
final Callable<Connection> sendQueryCallable = () -> client.chooseConnection(request.create());
final List<Callable<Connection>> listOfTasks = new ArrayList<>();
for (int i = 0; i < operations; i++) {
listOfTasks.add(sendQueryCallable);
}
HashMap<String, Integer> connectionBorrowCount = new HashMap<>();
final List<Future<Connection>> executorSubmitFutures = executorServiceForTesting.invokeAll(listOfTasks);
executorSubmitFutures.parallelStream().map(fut -> {
try {
return fut.get();
} catch (InterruptedException | ExecutionException e) {
fail(e.getMessage());
return null;
}
}).forEach(conn -> {
synchronized (this) {
String id = conn.getChannelId();
connectionBorrowCount.put(id, connectionBorrowCount.getOrDefault(id, 0) + 1);
}
});
assertEquals(2, connectionBorrowCount.size());
for (int finalBorrowCount : connectionBorrowCount.values()) {
assertEquals(usagePerConnection, finalBorrowCount);
}
} finally {
executorServiceForTesting.shutdown();
client.close();
cluster.close();
}
}
/**
* Added for TINKERPOP-2813 - this scenario would have previously thrown tons of
* {@link NoHostAvailableException}.
*/
@Test
public void shouldSucceedWithJitteryConnection() throws Exception {
final Cluster cluster = TestClientFactory.build().minConnectionPoolSize(1).maxConnectionPoolSize(4).
reconnectInterval(1000).
maxWaitForConnection(4000).validationRequest("g.inject()").create();
final Client.ClusteredClient client = cluster.connect();
client.init();
// every 10 connections let's have some problems
final JitteryConnectionFactory connectionFactory = new JitteryConnectionFactory(3);
client.hostConnectionPools.forEach((h, pool) -> pool.connectionFactory = connectionFactory);
// get an initial connection which marks the host as available
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
// network is gonna get fishy - ConnectionPool should try to grow during the workload below and when it
// does some connections will fail to create in the background which should log some errors but not tank
// the submit() as connections that are currently still working and active should be able to handle the load.
connectionFactory.jittery = true;
// load up a hella ton of requests
final int requests = 1000;
final CountDownLatch latch = new CountDownLatch(requests);
final AtomicBoolean hadFailOtherThanTimeout = new AtomicBoolean(false);
new Thread(() -> {
IntStream.range(0, requests).forEach(i -> {
try {
client.submitAsync("1 + " + i);
} catch (Exception ex) {
// we could catch a TimeoutException here in some cases if the jitters cause a borrow of a
// connection to take too long. submitAsync() will wrap in a RuntimeException. can't assert
// this condition inside this thread or the test locks up
hadFailOtherThanTimeout.compareAndSet(false, !(ex.getCause() instanceof TimeoutException));
} finally {
latch.countDown();
}
});
}, "worker-shouldSucceedWithJitteryConnection").start();
// wait long enough for the jitters to kick in at least a little
while (latch.getCount() > (requests / 2)) {
TimeUnit.MILLISECONDS.sleep(50);
}
// wait for requests to complete
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
// make sure we had some failures for sure coming out the factory
assertThat(connectionFactory.getNumberOfFailures(), is(greaterThan(0L)));
// if there was a exception in the worker thread, then it had better be a TimeoutException
assertThat(hadFailOtherThanTimeout.get(), is(false));
cluster.close();
}
/**
* Introduces random failures when creating a {@link Connection} for the {@link ConnectionPool}.
*/
public static class JitteryConnectionFactory implements ConnectionFactory {
private volatile boolean jittery = false;
private final AtomicLong connectionsCreated = new AtomicLong(0);
private final AtomicLong connectionFailures = new AtomicLong(0);
private final int numberOfConnectionsBetweenErrors;
public JitteryConnectionFactory(final int numberOfConnectionsBetweenErrors) {
this.numberOfConnectionsBetweenErrors = numberOfConnectionsBetweenErrors;
}
public long getNumberOfFailures() {
return connectionFailures.get();
}
@Override
public Connection create(final ConnectionPool pool) {
// fail creating a connection every 10 attempts or so when jittery
if (jittery && connectionsCreated.incrementAndGet() % numberOfConnectionsBetweenErrors == 0) {
connectionFailures.incrementAndGet();
throw new ConnectionException(pool.host.getHostUri(),
new SSLHandshakeException("SSL on the funk - server is big mad with the jitters"));
}
return ConnectionFactory.super.create(pool);
}
}
}