| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.tinkerpop.gremlin.server; |
| |
| import ch.qos.logback.classic.Level; |
| import ch.qos.logback.classic.Logger; |
| import nl.altindag.log.LogCaptor; |
| import org.apache.tinkerpop.gremlin.util.ExceptionHelper; |
| import org.apache.tinkerpop.gremlin.TestHelper; |
| import org.apache.tinkerpop.gremlin.driver.Client; |
| import org.apache.tinkerpop.gremlin.driver.Cluster; |
| import org.apache.tinkerpop.gremlin.driver.RequestOptions; |
| import org.apache.tinkerpop.gremlin.driver.Result; |
| import org.apache.tinkerpop.gremlin.driver.ResultSet; |
| import org.apache.tinkerpop.gremlin.util.Tokens; |
| import org.apache.tinkerpop.gremlin.driver.exception.ConnectionException; |
| import org.apache.tinkerpop.gremlin.driver.exception.NoHostAvailableException; |
| import org.apache.tinkerpop.gremlin.driver.exception.ResponseException; |
| import org.apache.tinkerpop.gremlin.driver.handler.WebSocketClientHandler; |
| import org.apache.tinkerpop.gremlin.util.message.RequestMessage; |
| import org.apache.tinkerpop.gremlin.util.message.ResponseStatusCode; |
| import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; |
| import org.apache.tinkerpop.gremlin.util.ser.GraphBinaryMessageSerializerV1; |
| import org.apache.tinkerpop.gremlin.util.ser.Serializers; |
| import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin; |
| import org.apache.tinkerpop.gremlin.process.traversal.Bytecode; |
| import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; |
| import org.apache.tinkerpop.gremlin.server.handler.OpExecutorHandler; |
| import org.apache.tinkerpop.gremlin.structure.Vertex; |
| import org.apache.tinkerpop.gremlin.structure.io.Storage; |
| import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; |
| import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerFactory; |
| import org.apache.tinkerpop.gremlin.util.TimeUtil; |
| import org.apache.tinkerpop.gremlin.util.function.FunctionUtils; |
| import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.mockito.ArgumentCaptor; |
| import org.mockito.Mockito; |
| import org.slf4j.LoggerFactory; |
| |
| import java.awt.Color; |
| import java.net.ConnectException; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.core.AllOf.allOf; |
| import static org.hamcrest.core.IsInstanceOf.instanceOf; |
| import static org.hamcrest.core.StringContains.containsString; |
| import static org.hamcrest.core.StringEndsWith.endsWith; |
| import static org.hamcrest.core.StringStartsWith.startsWith; |
| import static org.hamcrest.number.OrderingComparison.greaterThan; |
| import static org.hamcrest.number.OrderingComparison.lessThanOrEqualTo; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.hamcrest.CoreMatchers.is; |
| import static org.mockito.Mockito.verify; |
| |
| /** |
| * Integration tests for gremlin-driver configurations and settings. |
| * |
| * @author Stephen Mallette (http://stephen.genoprime.com) |
| */ |
| public class GremlinDriverIntegrateTest extends AbstractGremlinServerIntegrationTest { |
| private static final org.slf4j.Logger logger = LoggerFactory.getLogger(GremlinDriverIntegrateTest.class); |
| |
| private static LogCaptor logCaptor; |
| private Level previousLogLevel; |
| |
| @BeforeClass |
| public static void setupLogCaptor() { |
| logCaptor = LogCaptor.forRoot(); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() { |
| logCaptor.close(); |
| } |
| |
| @Before |
| public void setupForEachTest() { |
| if (name.getMethodName().equals("shouldKeepAliveForWebSockets") || |
| name.getMethodName().equals("shouldKeepAliveForWebSocketsWithNoInFlightRequests")) { |
| final Logger webSocketClientHandlerLogger = (Logger) LoggerFactory.getLogger(WebSocketClientHandler.class); |
| previousLogLevel = webSocketClientHandlerLogger.getLevel(); |
| webSocketClientHandlerLogger.setLevel(Level.DEBUG); |
| } else if (name.getMethodName().equals("shouldEventuallySucceedAfterMuchFailure")) { |
| final Logger opExecutorHandlerLogger = (Logger) LoggerFactory.getLogger(OpExecutorHandler.class); |
| previousLogLevel = opExecutorHandlerLogger.getLevel(); |
| opExecutorHandlerLogger.setLevel(Level.ERROR); |
| } |
| |
| logCaptor.clearLogs(); |
| } |
| |
| @After |
| public void afterEachTest() { |
| if (name.getMethodName().equals("shouldKeepAliveForWebSockets") || |
| name.getMethodName().equals("shouldKeepAliveForWebSocketsWithNoInFlightRequests")) { |
| final Logger webSocketClientHandlerLogger = (Logger) LoggerFactory.getLogger(WebSocketClientHandler.class); |
| webSocketClientHandlerLogger.setLevel(previousLogLevel); |
| } else if (name.getMethodName().equals("shouldEventuallySucceedAfterMuchFailure")) { |
| final Logger opExecutorHandlerLogger = (Logger) LoggerFactory.getLogger(OpExecutorHandler.class); |
| opExecutorHandlerLogger.setLevel(previousLogLevel); |
| } |
| } |
| |
| /** |
| * Configure specific Gremlin Server settings for specific tests. |
| */ |
| @Override |
| public Settings overrideSettings(final Settings settings) { |
| final String nameOfTest = name.getMethodName(); |
| |
| switch (nameOfTest) { |
| case "shouldAliasTraversalSourceVariables": |
| case "shouldAliasTraversalSourceVariablesInSession": |
| try { |
| final String p = Storage.toPath(TestHelper.generateTempFileFromResource( |
| GremlinDriverIntegrateTest.class, |
| "generate-shouldRebindTraversalSourceVariables.groovy", "")); |
| final Map<String,Object> m = new HashMap<>(); |
| m.put("files", Collections.singletonList(p)); |
| settings.scriptEngines.get("gremlin-groovy").plugins.put(ScriptFileGremlinPlugin.class.getName(), m); |
| } catch (Exception ex) { |
| throw new RuntimeException(ex); |
| } |
| break; |
| case "shouldFailWithBadClientSideSerialization": |
| // add custom gryo config for Color |
| final List<String> custom = Collections.singletonList( |
| Color.class.getName()); |
| settings.serializers.stream().filter(s -> s.className.contains("Gryo")) |
| .forEach(s -> s.config.put("custom", custom)); |
| break; |
| case "shouldExecuteScriptInSessionOnTransactionalGraph": |
| case "shouldExecuteSessionlessScriptOnTransactionalGraph": |
| case "shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph": |
| case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction": |
| case "shouldManageTransactionsInSession": |
| tryIncludeNeo4jGraph(settings); |
| break; |
| case "shouldRequireAliasedGraphVariablesInStrictTransactionMode": |
| settings.strictTransactionManagement = true; |
| break; |
| case "shouldAliasGraphVariablesInStrictTransactionMode": |
| settings.strictTransactionManagement = true; |
| tryIncludeNeo4jGraph(settings); |
| break; |
| case "shouldProcessSessionRequestsInOrderAfterTimeout": |
| settings.evaluationTimeout = 250; |
| settings.threadPoolWorker = 1; |
| break; |
| case "shouldProcessTraversalInterruption": |
| case "shouldProcessEvalInterruption": |
| settings.evaluationTimeout = 1500; |
| break; |
| } |
| |
| return settings; |
| } |
| |
| @Test |
| public void shouldInterceptRequests() throws Exception { |
| final int requestsToMake = 32; |
| final AtomicInteger websocketHandshakeRequests = new AtomicInteger(0); |
| |
| final Cluster cluster = TestClientFactory.build(). |
| minConnectionPoolSize(1).maxConnectionPoolSize(1).handshakeInterceptor(r -> { |
| websocketHandshakeRequests.incrementAndGet(); |
| return r; |
| }).create(); |
| |
| try { |
| final Client client = cluster.connect(); |
| for (int ix = 0; ix < requestsToMake; ix++) { |
| assertEquals(ix + 1, client.submit(ix + "+1").all().get().get(0).getInt()); |
| } |
| } finally { |
| cluster.close(); |
| } |
| |
| assertEquals(1, websocketHandshakeRequests.get()); |
| } |
| |
| @Test |
| public void shouldReportErrorWhenRequestCantBeSerialized() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| try { |
| final Client client = cluster.connect().alias("g"); |
| |
| try { |
| final Map<String, Object> params = new HashMap<>(); |
| params.put("r", Color.RED); |
| client.submit("r", params).all().get(); |
| fail("Should have thrown exception over bad serialization"); |
| } catch (Exception ex) { |
| final Throwable inner = ExceptionHelper.getRootCause(ex); |
| assertThat(inner, instanceOf(ResponseException.class)); |
| assertEquals(ResponseStatusCode.REQUEST_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode()); |
| assertTrue(ex.getMessage().contains("An error occurred during serialization of this request")); |
| } |
| |
| // should not die completely just because we had a bad serialization error. that kind of stuff happens |
| // from time to time, especially in the console if you're just exploring. |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldProcessTraversalInterruption() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("g.inject(1).sideEffect{Thread.sleep(5000)}").all().get(); |
| fail("Should have timed out"); |
| } catch (Exception ex) { |
| final ResponseException re = (ResponseException) ex.getCause(); |
| assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldProcessEvalInterruption() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("Thread.sleep(5000);'done'").all().get(); |
| fail("Should have timed out"); |
| } catch (Exception ex) { |
| final ResponseException re = (ResponseException) ex.getCause(); |
| assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, re.getResponseStatusCode()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldKeepAliveForWebSockets() throws Exception { |
| // keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will |
| // complicate the assertion logic |
| final Cluster cluster = TestClientFactory.build(). |
| minConnectionPoolSize(1). |
| maxConnectionPoolSize(1). |
| keepAliveInterval(1002).create(); |
| try { |
| final Client client = cluster.connect(); |
| |
| // fire up lots of requests so as to schedule/deschedule lots of ping jobs |
| for (int ix = 0; ix < 500; ix++) { |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } |
| |
| // don't send any messages for a bit so that the driver pings in the background |
| Thread.sleep(3000); |
| |
| // make sure no bonus messages sorta fire off once we get back to sending requests |
| for (int ix = 0; ix < 500; ix++) { |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } |
| |
| // there really shouldn't be more than 3 of these sent. should definitely be at least one though |
| final long messages = logCaptor.getLogs().stream().filter(m -> m.contains("Sending ping frame to the server")).count(); |
| assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L))); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldKeepAliveForWebSocketsWithNoInFlightRequests() throws Exception { |
| // keep the connection pool size at 1 to remove the possibility of lots of connections trying to ping which will |
| // complicate the assertion logic |
| final Cluster cluster = TestClientFactory.build(). |
| minConnectionPoolSize(1). |
| maxConnectionPoolSize(1). |
| keepAliveInterval(1002).create(); |
| try { |
| final Client client = cluster.connect(); |
| |
| // forcefully initialize the client to mimic a scenario when client has some active connection with no |
| // in flight requests on them. |
| client.init(); |
| |
| // don't send any messages for a bit so that the driver pings in the background |
| Thread.sleep(3000); |
| |
| // make sure no bonus messages sorta fire off once we get back to sending requests |
| for (int ix = 0; ix < 500; ix++) { |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } |
| |
| // there really shouldn't be more than 3 of these sent. should definitely be at least one though |
| final long messages = logCaptor.getLogs().stream().filter(m -> m.contains("Sending ping frame to the server")).count(); |
| assertThat(messages, allOf(greaterThan(0L), lessThanOrEqualTo(3L))); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldEventuallySucceedAfterChannelLevelError() throws Exception { |
| final Cluster cluster = TestClientFactory.build() |
| .reconnectInterval(500) |
| .maxContentLength(64).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| try { |
| client.submit("def x = '';(0..<1024).each{x = x + '$it'};x").all().get(); |
| fail("Request should have failed because it exceeded the max content length allowed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root.getMessage(), containsString("Max frame length of 64 has been exceeded.")); |
| } |
| |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldEventuallySucceedAfterMuchFailure() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| // tested independently to 10000 iterations but for speed, bumped back to 1000 |
| IntStream.range(0, 1000).forEach(i -> { |
| try { |
| client.submit("1 + 9 9").all().join().get(0).getInt(); |
| fail("Should not have gone through due to syntax error"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| } |
| }); |
| |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldEventuallySucceedOnSameServerWithScript() throws Exception { |
| stopServer(); |
| |
| final Cluster cluster = TestClientFactory.build().validationRequest("g.inject()").create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("1+1").all().join().get(0).getInt(); |
| fail("Should not have gone through because the server is not running"); |
| } catch (Exception i) { |
| assertThat(i, instanceOf(NoHostAvailableException.class)); |
| final Throwable root = ExceptionHelper.getRootCause(i); |
| assertThat(root, instanceOf(ConnectException.class)); |
| } |
| |
| startServer(); |
| |
| // default reconnect time is 1 second so wait some extra time to be sure it has time to try to bring it |
| // back to life. usually this passes on the first attempt, but docker is sometimes slow and we get failures |
| // waiting for Gremlin Server to pop back up |
| for (int ix = 3; ix < 13; ix++) { |
| TimeUnit.SECONDS.sleep(ix); |
| try { |
| final int result = client.submit("1+1").all().join().get(0).getInt(); |
| assertEquals(2, result); |
| break; |
| } catch (Exception ignored) { |
| logger.warn("Attempt {} failed on shouldEventuallySucceedOnSameServerWithScript", ix); |
| } |
| } |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldEventuallySucceedWithRoundRobin() throws Exception { |
| final String noGremlinServer = "74.125.225.19"; |
| final Cluster cluster = TestClientFactory.build().addContactPoint(noGremlinServer).create(); |
| |
| try { |
| final Client client = cluster.connect(); |
| client.init(); |
| |
| // the first host is dead on init. request should succeed on localhost |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| assertEquals(2, client.submit("1+1").all().join().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldHandleResultsOfAllSizes() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| |
| final String script = "g.V().drop().iterate();\n" + |
| "\n" + |
| "List ids = new ArrayList();\n" + |
| "\n" + |
| "int ii = 0;\n" + |
| "Vertex v = graph.addVertex();\n" + |
| "v.property(\"ii\", ii);\n" + |
| "v.property(\"sin\", Math.sin(ii));\n" + |
| "ids.add(v.id());\n" + |
| "\n" + |
| "Random rand = new Random();\n" + |
| "for (; ii < size; ii++) {\n" + |
| " v = graph.addVertex();\n" + |
| " v.property(\"ii\", ii);\n" + |
| " v.property(\"sin\", Math.sin(ii/5.0));\n" + |
| " Vertex u = graph.vertices(ids.get(rand.nextInt(ids.size()))).next();\n" + |
| " v.addEdge(\"linked\", u);\n" + |
| " ids.add(u.id());\n" + |
| " ids.add(v.id());\n" + |
| "}\n" + |
| "g.V()"; |
| |
| final List<Integer> sizes = Arrays.asList(1, 10, 20, 50, 75, 100, 250, 500, 750, 1000, 5000, 10000); |
| for (Integer size : sizes) { |
| final Map<String, Object> params = new HashMap<>(); |
| params.put("size", size - 1); |
| final ResultSet results = client.submit(script, params); |
| |
| assertEquals(size.intValue(), results.all().get().size()); |
| } |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldFailWithBadClientSideSerialization() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| |
| final ResultSet results = client.submit("java.awt.Color.RED"); |
| |
| try { |
| results.all().join(); |
| fail("Should have thrown exception over bad serialization"); |
| } catch (Exception ex) { |
| final Throwable inner = ExceptionHelper.getRootCause(ex); |
| assertThat(inner, instanceOf(ResponseException.class)); |
| assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode()); |
| } |
| |
| // should not die completely just because we had a bad serialization error. that kind of stuff happens |
| // from time to time, especially in the console if you're just exploring. |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldFailWithScriptExecutionException() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| try { |
| try { |
| final ResultSet results = client.submit("1/0"); |
| results.all().join(); |
| fail("Should have thrown exception over division by zero"); |
| } catch (Exception ex) { |
| final Throwable inner = ExceptionHelper.getRootCause(ex); |
| assertTrue(inner instanceof ResponseException); |
| assertThat(inner.getMessage(), endsWith("Division by zero")); |
| |
| final ResponseException rex = (ResponseException) inner; |
| assertEquals("java.lang.ArithmeticException", rex.getRemoteExceptionHierarchy().get().get(0)); |
| assertEquals(1, rex.getRemoteExceptionHierarchy().get().size()); |
| assertThat(rex.getRemoteStackTrace().get(), containsString("Division by zero")); |
| } |
| |
| // should not die completely just because we had a bad serialization error. that kind of stuff happens |
| // from time to time, especially in the console if you're just exploring. |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldProcessRequestsOutOfOrder() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| final Client client = cluster.connect(); |
| |
| final ResultSet rsFive = client.submit("Thread.sleep(5000);'five'"); |
| final ResultSet rsZero = client.submit("'zero'"); |
| |
| final CompletableFuture<List<Result>> futureFive = rsFive.all(); |
| final CompletableFuture<List<Result>> futureZero = rsZero.all(); |
| |
| final long start = System.nanoTime(); |
| assertFalse(futureFive.isDone()); |
| assertEquals("zero", futureZero.get().get(0).getString()); |
| |
| logger.info("Eval of 'zero' complete: " + TimeUtil.millisSince(start)); |
| |
| assertFalse(futureFive.isDone()); |
| assertEquals("five", futureFive.get(10, TimeUnit.SECONDS).get(0).getString()); |
| |
| logger.info("Eval of 'five' complete: " + TimeUtil.millisSince(start)); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| /** |
| * This test validates that the session requests are processed in-order on the server. The order of results |
| * returned to the client might be different though since each result is handled by a different executor thread. |
| */ |
| @Test |
| public void shouldProcessSessionRequestsInOrder() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final ResultSet first = client.submit("Thread.sleep(5000);g.V().fold().coalesce(unfold(), __.addV('person'))"); |
| final ResultSet second = client.submit("g.V().count()"); |
| |
| final CompletableFuture<List<Result>> futureFirst = first.all(); |
| final CompletableFuture<List<Result>> futureSecond = second.all(); |
| |
| final CountDownLatch latch = new CountDownLatch(2); |
| final List<Object> results = new ArrayList<>(); |
| final ExecutorService executor = Executors.newSingleThreadExecutor(); |
| |
| futureFirst.thenAcceptAsync(r -> { |
| results.add(r.get(0).getVertex().label()); |
| latch.countDown(); |
| }, executor); |
| |
| futureSecond.thenAcceptAsync(r -> { |
| results.add(r.get(0).getLong()); |
| latch.countDown(); |
| }, executor); |
| |
| // wait for both results |
| latch.await(30000, TimeUnit.MILLISECONDS); |
| |
| assertThat("Should contain 2 results", results.size() == 2); |
| assertThat("The numeric result should be 1", results.contains(1L)); |
| assertThat("The string result contain label person", results.contains("person")); |
| |
| executor.shutdown(); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWaitForAllResultsToArrive() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| final Client client = cluster.connect(); |
| |
| final AtomicInteger checked = new AtomicInteger(0); |
| final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]"); |
| while (!results.allItemsAvailable()) { |
| assertTrue(results.getAvailableItemCount() < 10); |
| checked.incrementAndGet(); |
| Thread.sleep(100); |
| } |
| |
| assertTrue(checked.get() > 0); |
| assertEquals(9, results.getAvailableItemCount()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldStream() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]"); |
| final AtomicInteger counter = new AtomicInteger(0); |
| results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString()))); |
| assertEquals(9, counter.get()); |
| assertThat(results.allItemsAvailable(), is(true)); |
| |
| // cant stream it again |
| assertThat(results.stream().iterator().hasNext(), is(false)); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldIterate() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]"); |
| final Iterator<Result> itty = results.iterator(); |
| final AtomicInteger counter = new AtomicInteger(0); |
| while (itty.hasNext()) { |
| counter.incrementAndGet(); |
| assertEquals(counter.get(), itty.next().getInt()); |
| } |
| |
| assertEquals(9, counter.get()); |
| assertThat(results.allItemsAvailable(), is(true)); |
| |
| // can't stream it again |
| assertThat(results.iterator().hasNext(), is(false)); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldGetSomeThenSomeMore() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]"); |
| final CompletableFuture<List<Result>> batch1 = results.some(5); |
| final CompletableFuture<List<Result>> batch2 = results.some(5); |
| final CompletableFuture<List<Result>> batchNothingLeft = results.some(5); |
| |
| assertEquals(5, batch1.get().size()); |
| assertEquals(1, batch1.get().get(0).getInt()); |
| assertEquals(2, batch1.get().get(1).getInt()); |
| assertEquals(3, batch1.get().get(2).getInt()); |
| assertEquals(4, batch1.get().get(3).getInt()); |
| assertEquals(5, batch1.get().get(4).getInt()); |
| |
| assertEquals(4, batch2.get().size()); |
| assertEquals(6, batch2.get().get(0).getInt()); |
| assertEquals(7, batch2.get().get(1).getInt()); |
| assertEquals(8, batch2.get().get(2).getInt()); |
| assertEquals(9, batch2.get().get(3).getInt()); |
| |
| assertEquals(0, batchNothingLeft.get().size()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldGetOneThenSomeThenSomeMore() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]"); |
| final Result one = results.one(); |
| final CompletableFuture<List<Result>> batch1 = results.some(4); |
| final CompletableFuture<List<Result>> batch2 = results.some(5); |
| final CompletableFuture<List<Result>> batchNothingLeft = results.some(5); |
| |
| assertEquals(1, one.getInt()); |
| |
| assertEquals(4, batch1.get().size()); |
| assertEquals(2, batch1.get().get(0).getInt()); |
| assertEquals(3, batch1.get().get(1).getInt()); |
| assertEquals(4, batch1.get().get(2).getInt()); |
| assertEquals(5, batch1.get().get(3).getInt()); |
| |
| assertEquals(4, batch2.get().size()); |
| assertEquals(6, batch2.get().get(0).getInt()); |
| assertEquals(7, batch2.get().get(1).getInt()); |
| assertEquals(8, batch2.get().get(2).getInt()); |
| assertEquals(9, batch2.get().get(3).getInt()); |
| |
| assertEquals(0, batchNothingLeft.get().size()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAvoidDeadlockOnCallToResultSetDotAll() throws Exception { |
| |
| // This test arose from this issue: https://github.org/apache/tinkerpop/tinkerpop3/issues/515 |
| // |
| // ResultSet.all returns a CompletableFuture that blocks on the worker pool until isExhausted returns false. |
| // isExhausted in turn needs a thread on the worker pool to even return. So its totally possible to consume all |
| // threads on the worker pool waiting for .all to finish such that you can't even get one to wait for |
| // isExhausted to run. |
| // |
| // Note that all() doesn't work as described above anymore. It waits for callback on readComplete rather |
| // than blocking on isExhausted. |
| final int workerPoolSizeForDriver = 2; |
| |
| // the number of requests 4 times the size of the worker pool as this originally did produce the problem |
| // described above in the javadoc of the test (though an equivalent number also produced it), but this has |
| // been tested to much higher multiples and passes. note that the maxWaitForConnection setting is high so |
| // that the client doesn't timeout waiting for an available connection. obviously this can also be fixed |
| // by increasing the maxConnectionPoolSize. |
| final int requests = workerPoolSizeForDriver * 4; |
| final Cluster cluster = TestClientFactory.build() |
| .workerPoolSize(workerPoolSizeForDriver) |
| .maxWaitForConnection(300000) |
| .create(); |
| try { |
| final Client client = cluster.connect(); |
| |
| final CountDownLatch latch = new CountDownLatch(requests); |
| final AtomicReference[] refs = new AtomicReference[requests]; |
| IntStream.range(0, requests).forEach(ix -> { |
| refs[ix] = new AtomicReference(); |
| client.submitAsync("Thread.sleep(5000);[1,2,3,4,5,6,7,8,9]").thenAccept(rs -> |
| rs.all().thenAccept(refs[ix]::set).thenRun(latch::countDown)); |
| }); |
| |
| // countdown should have reached zero as results should have eventually been all returned and processed |
| assertTrue(latch.await(30, TimeUnit.SECONDS)); |
| |
| final List<Integer> expected = IntStream.range(1, 10).boxed().collect(Collectors.toList()); |
| IntStream.range(0, requests).forEach(r -> |
| assertTrue(expected.containsAll(((List<Result>) refs[r].get()).stream().map(resultItem -> new Integer(resultItem.getInt())).collect(Collectors.toList())))); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldCloseWithServerDown() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| cluster.connect().init(); |
| |
| stopServer(); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldMarkHostDeadSinceServerIsDown() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| assertEquals(0, cluster.availableHosts().size()); |
| final Client client1 = cluster.connect().init(); |
| assertEquals(1, cluster.availableHosts().size()); |
| |
| stopServer(); |
| |
| // We create a new client here which will fail to initialize but the original client still has |
| // host marked as connected. Since the second client failed during initialization, it has no way to |
| // test if a host is indeed unreachable because it doesn't have any established connections. It will not add |
| // the host to load balancer but it will also not remove it if it already exists there. Leave that |
| // responsibility to a client that added it. In this case, let the second client perform it's own mechanism |
| // to mark host as unavailable. The first client will discover that the host has failed either with next |
| // keepAlive message or the next request, whichever is earlier. In this case, we will simulate the second |
| // scenario by sending a new request on first client. The request would fail (since server is down) and |
| // client should mark the host unavailable. |
| cluster.connect().init(); |
| |
| try { |
| client1.submit("1+1").all().join(); |
| fail("Expecting an exception because the server is shut down."); |
| } catch (Exception ex) { |
| // ignore the exception |
| } |
| |
| assertEquals(0, cluster.availableHosts().size()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldFailWithBadServerSideSerialization() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| try { |
| |
| final ResultSet results = client.submit("TinkerGraph.open().variables()"); |
| |
| try { |
| results.all().join(); |
| fail(); |
| } catch (Exception ex) { |
| final Throwable inner = ExceptionHelper.getRootCause(ex); |
| assertTrue(inner instanceof ResponseException); |
| assertEquals(ResponseStatusCode.SERVER_ERROR_SERIALIZATION, ((ResponseException) inner).getResponseStatusCode()); |
| } |
| |
| // should not die completely just because we had a bad serialization error. that kind of stuff happens |
| // from time to time, especially in the console if you're just exploring. |
| assertEquals(2, client.submit("1+1").all().get().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldSerializeToStringWhenRequestedGraphBinaryV1() throws Exception { |
| final Map<String, Object> m = new HashMap<>(); |
| m.put("serializeResultToString", true); |
| final GraphBinaryMessageSerializerV1 serializer = new GraphBinaryMessageSerializerV1(); |
| serializer.configure(m, null); |
| |
| final Cluster cluster = TestClientFactory.build().serializer(serializer).create(); |
| final Client client = cluster.connect(); |
| |
| final ResultSet resultSet = client.submit("TinkerFactory.createClassic()"); |
| final List<Result> results = resultSet.all().join(); |
| assertEquals(1, results.size()); |
| assertEquals("tinkergraph[vertices:6 edges:6]", results.get(0).getString()); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldWorkWithGraphSONV1Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V1D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Map<String, Object> m = r.get(0).get(Map.class); |
| assertEquals(4, m.size()); |
| assertEquals(1, m.get("id")); |
| assertEquals("person", m.get("label")); |
| assertEquals("vertex", m.get("type")); |
| |
| final Map<String, Object> properties = (Map<String, Object>) m.get("properties"); |
| assertEquals(2, properties.size()); |
| |
| final List<Object> names = (List<Object>) properties.get("name"); |
| assertEquals(1, names.size()); |
| |
| final Map<String, Object> nameProperties = (Map<String, Object>) names.get(0); |
| assertEquals(2, nameProperties.size()); |
| assertEquals(0l, nameProperties.get("id")); |
| assertEquals("marko", nameProperties.get("value")); |
| |
| final List<Object> ages = (List<Object>) properties.get("age"); |
| assertEquals(1, ages.size()); |
| |
| final Map<String, Object> ageProperties = (Map<String, Object>) ages.get(0); |
| assertEquals(2, ageProperties.size()); |
| assertEquals(1l, ageProperties.get("id")); |
| assertEquals(29, ageProperties.get("value")); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWorkWithGraphSONV2Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Vertex v = r.get(0).get(DetachedVertex.class); |
| assertEquals(1, v.id()); |
| assertEquals("person", v.label()); |
| |
| assertEquals(2, IteratorUtils.count(v.properties())); |
| assertEquals("marko", v.value("name")); |
| assertEquals(29, Integer.parseInt(v.value("age").toString())); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWorkWithGraphSONExtendedV2Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V2D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("java.time.Instant.EPOCH").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Instant then = r.get(0).get(Instant.class); |
| assertEquals(Instant.EPOCH, then); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWorkWithGraphSONV3Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Vertex v = r.get(0).get(DetachedVertex.class); |
| assertEquals(1, v.id()); |
| assertEquals("person", v.label()); |
| |
| assertEquals(2, IteratorUtils.count(v.properties())); |
| assertEquals("marko", v.value("name")); |
| assertEquals(29, Integer.parseInt(v.value("age").toString())); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWorkWithGraphSONExtendedV3Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("java.time.Instant.EPOCH").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Instant then = r.get(0).get(Instant.class); |
| assertEquals(Instant.EPOCH, then); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldWorkWithGraphBinaryV1Serialization() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final List<Result> r = client.submit("TinkerFactory.createModern().traversal().V(1)").all().join(); |
| assertEquals(1, r.size()); |
| |
| final Vertex v = r.get(0).get(DetachedVertex.class); |
| assertEquals(1, v.id()); |
| assertEquals("person", v.label()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldFailClientSideWithTooLargeAResponse() { |
| final Cluster cluster = TestClientFactory.build().maxContentLength(1).create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final String fatty = IntStream.range(0, 100).mapToObj(String::valueOf).collect(Collectors.joining()); |
| client.submit("'" + fatty + "'").all().get(); |
| fail("Should throw an exception."); |
| } catch (Exception re) { |
| final Throwable root = ExceptionHelper.getRootCause(re); |
| assertTrue(root.getMessage().equals("Max frame length of 1 has been exceeded.")); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldReturnNiceMessageFromOpSelector() { |
| final Cluster cluster = TestClientFactory.build().create(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final Map m = new HashMap<>(); |
| m.put(null, "a null key will force a throw of OpProcessorException in message validation"); |
| client.submit("1+1", m).all().get(); |
| fail("Should throw an exception."); |
| } catch (Exception re) { |
| final Throwable root = ExceptionHelper.getRootCause(re); |
| assertEquals("The [eval] message is using one or more invalid binding keys - they must be of type String and cannot be null", root.getMessage()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldExecuteScriptInSession() throws Exception { |
| final Cluster cluster = TestClientFactory.build().create(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]"); |
| assertEquals(9, results1.all().get().size()); |
| |
| final ResultSet results2 = client.submit("x[0]+1"); |
| assertEquals(2, results2.all().get().get(0).getInt()); |
| |
| final ResultSet results3 = client.submit("x[1]+2"); |
| assertEquals(4, results3.all().get().get(0).getInt()); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldNotThrowNoSuchElementException() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| // this should return "nothing" - there should be no exception |
| assertNull(client.submit("g.V().has('name','kadfjaldjfla')").one()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldEvalInGremlinLang() { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| final RequestOptions ro = RequestOptions.build().language("gremlin-lang").create(); |
| assertEquals(111, client.submit("g.inject(111)", ro).one().getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldCloseSession() throws Exception { |
| final Cluster cluster = TestClientFactory.build().create(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]"); |
| assertEquals(9, results1.all().get().size()); |
| final ResultSet results2 = client.submit("x[0]+1"); |
| assertEquals(2, results2.all().get().get(0).getInt()); |
| |
| client.close(); |
| |
| try { |
| client.submit("x[0]+1").all().get(); |
| fail("Should have thrown an exception because the connection is closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldExecuteScriptInSessionAssumingDefaultedImports() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final ResultSet results1 = client.submit("TinkerFactory.class.name"); |
| assertEquals(TinkerFactory.class.getName(), results1.all().get().get(0).getString()); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldExecuteScriptInSessionOnTransactionalGraph() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex(); |
| assertEquals("person", vertexBeforeTx.label()); |
| |
| final String nameValueFromV = client.submit("g.V().values('name').next()").all().get().get(0).getString(); |
| assertEquals("stephen", nameValueFromV); |
| |
| final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex(); |
| assertEquals("person", vertexFromBinding.label()); |
| |
| final Map<String,Object> vertexAfterTx = client.submit("g.V(v).property(\"color\",\"blue\").iterate(); g.tx().commit(); g.V(v).valueMap().by(unfold())").all().get().get(0).get(Map.class); |
| assertEquals("stephen", vertexAfterTx.get("name")); |
| assertEquals("blue", vertexAfterTx.get("color")); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldExecuteScriptInSessionOnTransactionalWithManualTransactionsGraph() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(name.getMethodName()); |
| final Client sessionlessClient = cluster.connect(); |
| client.submit("graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);null").all().get(); |
| client.submit("graph.tx().open()").all().get(); |
| |
| final Vertex vertexBeforeTx = client.submit("v=g.addV(\"person\").property(\"name\", \"stephen\").next()").all().get().get(0).getVertex(); |
| assertEquals("person", vertexBeforeTx.label()); |
| |
| final String nameValueFromV = client.submit("g.V().values(\"name\").next()").all().get().get(0).getString(); |
| assertEquals("stephen", nameValueFromV); |
| |
| final Vertex vertexFromBinding = client.submit("v").all().get().get(0).getVertex(); |
| assertEquals("person", vertexFromBinding.label()); |
| |
| client.submit("g.V(v).property(\"color\",\"blue\")").all().get(); |
| client.submit("g.tx().commit()").all().get(); |
| |
| // Run a sessionless request to change transaction.readWriteConsumer back to AUTO |
| // The will make the next in session request fail if consumers aren't ThreadLocal |
| sessionlessClient.submit("g.V().next()").all().get(); |
| |
| client.submit("g.tx().open()").all().get(); |
| |
| final Map<String,Object> vertexAfterTx = client.submit("g.V().valueMap().by(unfold())").all().get().get(0).get(Map.class); |
| assertEquals("stephen", vertexAfterTx.get("name")); |
| assertEquals("blue", vertexAfterTx.get("color")); |
| |
| client.submit("g.tx().rollback()").all().get(); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldExecuteInSessionAndSessionlessWithoutOpeningTransaction() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| final Client sessionClient = cluster.connect(name.getMethodName()); |
| final Client sessionlessClient = cluster.connect(); |
| |
| //open transaction in session, then add vertex and commit |
| sessionClient.submit("g.tx().open()").all().get(); |
| final Vertex vertexBeforeTx = sessionClient.submit("v=g.addV(\"person\").property(\"name\",\"stephen\").next()").all().get().get(0).getVertex(); |
| assertEquals("person", vertexBeforeTx.label()); |
| sessionClient.submit("g.tx().commit()").all().get(); |
| |
| // check that session transaction is closed |
| final boolean isOpen = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean(); |
| assertTrue("Transaction should be closed", !isOpen); |
| |
| //run a sessionless read |
| sessionlessClient.submit("g.V()").all().get(); |
| |
| // check that session transaction is still closed |
| final boolean isOpenAfterSessionless = sessionClient.submit("g.tx().isOpen()").all().get().get(0).getBoolean(); |
| assertTrue("Transaction should stil be closed", !isOpenAfterSessionless); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldExecuteSessionlessScriptOnTransactionalGraph() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| // this line is important because it tests GraphTraversal which has a certain transactional path |
| final Vertex vertexRequest1 = client.submit("g.addV().property(\"name\",\"stephen\")").all().get().get(0).getVertex(); |
| |
| final Vertex vertexRequest2 = client.submit("graph.vertices().next()").all().get().get(0).getVertex(); |
| assertEquals(vertexRequest1.id(), vertexRequest2.id()); |
| |
| // this line is important because it tests the other transactional path |
| client.submit("graph.addVertex(\"name\",\"marko\")").all().get().get(0).getVertex(); |
| |
| assertEquals(2, client.submit("g.V().count()").all().get().get(0).getLong()); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldExecuteScriptInSessionWithBindingsSavedOnServerBetweenRequests() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| final Map<String, Object> bindings1 = new HashMap<>(); |
| bindings1.put("a", 100); |
| bindings1.put("b", 200); |
| final ResultSet results1 = client.submit("x = a + b", bindings1); |
| assertEquals(300, results1.one().getInt()); |
| |
| final Map<String, Object> bindings2 = new HashMap<>(); |
| bindings2.put("b", 100); |
| final ResultSet results2 = client.submit("x + b + a", bindings2); |
| assertEquals(500, results2.one().getInt()); |
| |
| final Map<String, Object> bindings3 = new HashMap<>(); |
| bindings3.put("x", 100); |
| final ResultSet results3 = client.submit("x + b + a + 1", bindings3); |
| assertEquals(301, results3.one().getInt()); |
| |
| final Map<String, Object> bindings4 = new HashMap<>(); |
| final ResultSet results4 = client.submit("x + b + a + 1", bindings4); |
| assertEquals(301, results4.one().getInt()); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldExecuteScriptsInMultipleSession() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| try { |
| final Client client1 = cluster.connect(name.getMethodName() + "1"); |
| final Client client2 = cluster.connect(name.getMethodName() + "2"); |
| final Client client3 = cluster.connect(name.getMethodName() + "3"); |
| |
| final ResultSet results11 = client1.submit("x = 1"); |
| final ResultSet results21 = client2.submit("x = 2"); |
| final ResultSet results31 = client3.submit("x = 3"); |
| assertEquals(1, results11.all().get().get(0).getInt()); |
| assertEquals(2, results21.all().get().get(0).getInt()); |
| assertEquals(3, results31.all().get().get(0).getInt()); |
| |
| final ResultSet results12 = client1.submit("x + 100"); |
| final ResultSet results22 = client2.submit("x * 2"); |
| final ResultSet results32 = client3.submit("x * 10"); |
| assertEquals(101, results12.all().get().get(0).getInt()); |
| assertEquals(4, results22.all().get().get(0).getInt()); |
| assertEquals(30, results32.all().get().get(0).getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldNotHaveKnowledgeOfBindingsBetweenRequestsWhenSessionless() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client1 = cluster.connect(); |
| final Client client2 = cluster.connect(); |
| final Client client3 = cluster.connect(); |
| |
| final ResultSet results11 = client1.submit("x = 1"); |
| final ResultSet results21 = client2.submit("x = 2"); |
| final ResultSet results31 = client3.submit("x = 3"); |
| assertEquals(1, results11.all().get().get(0).getInt()); |
| assertEquals(2, results21.all().get().get(0).getInt()); |
| assertEquals(3, results31.all().get().get(0).getInt()); |
| |
| try { |
| client1.submit("x").all().get(); |
| fail("The variable 'x' should not be present on the new request."); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| assertThat(root.getMessage(), containsString("No such property: x for class")); |
| } |
| |
| try { |
| client2.submit("x").all().get(); |
| fail("The variable 'x' should not be present on the new request."); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| assertThat(root.getMessage(), containsString("No such property: x for class")); |
| } |
| |
| try { |
| client3.submit("x").all().get(); |
| fail("The variable 'x' should not be present on the new request."); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| assertThat(root.getMessage(), containsString("No such property: x for class")); |
| } |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldBeThreadSafeToUseOneClient() throws Exception { |
| final Cluster cluster = TestClientFactory.build().workerPoolSize(2) |
| .maxInProcessPerConnection(64) |
| .minInProcessPerConnection(32) |
| .maxConnectionPoolSize(16) |
| .minConnectionPoolSize(8).create(); |
| final Client client = cluster.connect(); |
| |
| final Map<Integer, Integer> results = new ConcurrentHashMap<>(); |
| final List<Thread> threads = new ArrayList<>(); |
| for (int ix = 0; ix < 100; ix++) { |
| final int otherNum = ix; |
| final Thread t = new Thread(()->{ |
| try { |
| results.put(otherNum, client.submit("1000+" + otherNum).all().get().get(0).getInt()); |
| } catch (Exception ex) { |
| ex.printStackTrace(); |
| } |
| }, name.getMethodName() + "-" + ix); |
| |
| t.start(); |
| threads.add(t); |
| } |
| |
| threads.forEach(FunctionUtils.wrapConsumer(Thread::join)); |
| |
| for (int ix = 0; ix < results.size(); ix++) { |
| assertThat(results.containsKey(ix), is(true)); |
| assertEquals(1000 + ix, results.get(ix).intValue()); |
| } |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldRequireAliasedGraphVariablesInStrictTransactionMode() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("1+1").all().get(); |
| fail("Should have tossed an exception because strict mode is on and no aliasing was performed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS, re.getResponseStatusCode()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAliasGraphVariablesInStrictTransactionMode() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("g.addVertex('name','stephen');").all().get().get(0).getVertex(); |
| fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS, re.getResponseStatusCode()); |
| |
| final Client rebound = cluster.connect().alias("graph"); |
| final Vertex v = rebound.submit("g.addVertex(T.label,'person')").all().get().get(0).getVertex(); |
| assertEquals("person", v.label()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAliasGraphVariables() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| |
| try { |
| client.submit("g.addVertex(label,'person','name','stephen');").all().get().get(0).getVertex(); |
| fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode()); |
| |
| final Client rebound = cluster.connect().alias("graph"); |
| final Vertex v = rebound.submit("g.addVertex(label,'person','name','jason')").all().get().get(0).getVertex(); |
| assertEquals("person", v.label()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAliasTraversalSourceVariables() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create(); |
| final Client client = cluster.connect(); |
| try { |
| try { |
| client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex(); |
| fail("Should have tossed an exception because \"g\" is readonly in this context"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode()); |
| } |
| |
| final Client clientAliased = client.alias("g1"); |
| final String name = clientAliased.submit("g.addV().property('name','jason').values('name')").all().get().get(0).getString(); |
| assertEquals("jason", name); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAliasGraphVariablesInSession() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| try { |
| client.submit("g.addVertex('name','stephen');").all().get().get(0).getVertex(); |
| fail("Should have tossed an exception because \"g\" does not have the addVertex method under default config"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode()); |
| client.close(); |
| } |
| |
| try { |
| final Client aliased = cluster.connect(name.getMethodName()).alias("graph"); |
| assertEquals("jason", aliased.submit("n='jason'").all().get().get(0).getString()); |
| final String name = aliased.submit("g.addVertex('name',n).values('name')").all().get().get(0).getString(); |
| assertEquals("jason", name); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldAliasTraversalSourceVariablesInSession() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHBINARY_V1D0).create(); |
| final Client client = cluster.connect(name.getMethodName()); |
| |
| try { |
| client.submit("g.addV().property('name','stephen')").all().get().get(0).getVertex(); |
| fail("Should have tossed an exception because \"g\" is readonly in this context"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| final ResponseException re = (ResponseException) root; |
| assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, re.getResponseStatusCode()); |
| } |
| |
| final Client clientAliased = client.alias("g1"); |
| assertEquals("jason", clientAliased.submit("n='jason'").all().get().get(0).getString()); |
| final String name = clientAliased.submit("g.addV().property('name',n).values('name')").all().get().get(0).getString(); |
| assertEquals("jason", name); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldManageTransactionsInSession() throws Exception { |
| assumeNeo4jIsPresent(); |
| |
| final Cluster cluster = TestClientFactory.open(); |
| final Client client = cluster.connect(); |
| final Client sessionWithManagedTx = cluster.connect(name.getMethodName() + "-managed", true); |
| final Client sessionWithoutManagedTx = cluster.connect(name.getMethodName() + "-not-managed"); |
| |
| // this should auto-commit |
| sessionWithManagedTx.submit("v = g.addV().property('name','stephen').next()").all().get().get(0).getVertex(); |
| |
| // the other clients should see that change because of auto-commit |
| assertThat(client.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true)); |
| assertThat(sessionWithoutManagedTx.submit("g.V().has('name','stephen').hasNext()").all().get().get(0).getBoolean(), is(true)); |
| |
| // this should NOT auto-commit |
| final Vertex vDaniel = sessionWithoutManagedTx.submit("v = g.addV().property('name','daniel').next()").all().get().get(0).getVertex(); |
| |
| // the other clients should NOT see that change because of auto-commit |
| assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false)); |
| assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(false)); |
| |
| // but "v" should still be there |
| final Vertex vDanielAgain = sessionWithoutManagedTx.submit("v").all().get().get(0).getVertex(); |
| assertEquals(vDaniel.id(), vDanielAgain.id()); |
| |
| // now commit manually |
| sessionWithoutManagedTx.submit("g.tx().commit()").all().get(); |
| |
| // should be there for all now |
| assertThat(client.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); |
| assertThat(sessionWithManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); |
| assertThat(sessionWithoutManagedTx.submit("g.V().has('name','daniel').hasNext()").all().get().get(0).getBoolean(), is(true)); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldProcessSessionRequestsInOrderAfterTimeout() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| |
| try { |
| // this configures the client to behave like OpProcessor for UnifiedChannelizer |
| final Client.SessionSettings settings = Client.SessionSettings.build(). |
| sessionId(name.getMethodName()).maintainStateAfterException(true).create(); |
| final Client client = cluster.connect(Client.Settings.build().useSession(settings).create()); |
| |
| for (int index = 0; index < 50; index++) { |
| final CompletableFuture<ResultSet> first = client.submitAsync( |
| "Object mon1 = 'mon1';\n" + |
| "synchronized (mon1) {\n" + |
| " mon1.wait();\n" + |
| "} "); |
| |
| final CompletableFuture<ResultSet> second = client.submitAsync( |
| "Object mon2 = 'mon2';\n" + |
| "synchronized (mon2) {\n" + |
| " mon2.wait();\n" + |
| "}"); |
| |
| final CompletableFuture<ResultSet> third = client.submitAsync( |
| "Object mon3 = 'mon3';\n" + |
| "synchronized (mon3) {\n" + |
| " mon3.wait();\n" + |
| "}"); |
| |
| final CompletableFuture<ResultSet> fourth = client.submitAsync( |
| "Object mon4 = 'mon4';\n" + |
| "synchronized (mon4) {\n" + |
| " mon4.wait();\n" + |
| "}"); |
| |
| final CompletableFuture<List<Result>> futureFirst = first.get().all(); |
| final CompletableFuture<List<Result>> futureSecond = second.get().all(); |
| final CompletableFuture<List<Result>> futureThird = third.get().all(); |
| final CompletableFuture<List<Result>> futureFourth = fourth.get().all(); |
| |
| assertFutureTimeout(futureFirst); |
| assertFutureTimeout(futureSecond); |
| assertFutureTimeout(futureThird); |
| assertFutureTimeout(futureFourth); |
| } |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| private void assertFutureTimeout(final CompletableFuture<List<Result>> f) { |
| try { |
| f.get(); |
| fail("Should have timed out"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(ResponseException.class)); |
| assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) root).getResponseStatusCode()); |
| assertThat(root.getMessage(), allOf(startsWith("Evaluation exceeded"), containsString("250 ms"))); |
| } |
| } |
| |
| @Test |
| public void shouldCloseAllClientsOnCloseOfCluster() throws Exception { |
| final Cluster cluster = TestClientFactory.open(); |
| final Client sessionlessOne = cluster.connect(); |
| final Client session = cluster.connect("session"); |
| final Client sessionlessTwo = cluster.connect(); |
| final Client sessionlessThree = cluster.connect(); |
| final Client sessionlessFour = cluster.connect(); |
| |
| assertEquals(2, sessionlessOne.submit("1+1").all().get().get(0).getInt()); |
| assertEquals(2, session.submit("1+1").all().get().get(0).getInt()); |
| assertEquals(2, sessionlessTwo.submit("1+1").all().get().get(0).getInt()); |
| assertEquals(2, sessionlessThree.submit("1+1").all().get().get(0).getInt()); |
| // dont' send anything on the 4th client |
| |
| // close one of these Clients before the Cluster |
| sessionlessThree.close(); |
| cluster.close(); |
| |
| try { |
| sessionlessOne.submit("1+1").all().get(); |
| fail("Should have tossed an exception because cluster was closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| assertEquals("Client is closed", root.getMessage()); |
| } |
| |
| try { |
| session.submit("1+1").all().get(); |
| fail("Should have tossed an exception because cluster was closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| assertEquals("Client is closed", root.getMessage()); |
| } |
| |
| try { |
| sessionlessTwo.submit("1+1").all().get(); |
| fail("Should have tossed an exception because cluster was closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| assertEquals("Client is closed", root.getMessage()); |
| } |
| |
| try { |
| sessionlessThree.submit("1+1").all().get(); |
| fail("Should have tossed an exception because cluster was closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| assertEquals("Client is closed", root.getMessage()); |
| } |
| |
| try { |
| sessionlessFour.submit("1+1").all().get(); |
| fail("Should have tossed an exception because cluster was closed"); |
| } catch (Exception ex) { |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root, instanceOf(IllegalStateException.class)); |
| assertEquals("Client is closed", root.getMessage()); |
| } |
| |
| // allow call to close() even though closed through cluster |
| sessionlessOne.close(); |
| session.close(); |
| sessionlessTwo.close(); |
| |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldSendUserAgent() throws Exception { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| final Client client = Mockito.spy(cluster.connect().alias("g")); |
| client.submit("", RequestOptions.build().userAgent("test").create()).all().get(); |
| cluster.close(); |
| |
| final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class); |
| verify(client).submitAsync(requestMessageCaptor.capture()); |
| final RequestMessage requestMessage = requestMessageCaptor.getValue(); |
| assertEquals("test", requestMessage.getArgs().get(Tokens.ARGS_USER_AGENT)); |
| } |
| |
| @Test |
| public void shouldSendUserAgentBytecode() { |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| final Client client = Mockito.spy(cluster.connect().alias("g")); |
| Mockito.when(client.alias("g")).thenReturn(client); |
| final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client)); |
| g.with(Tokens.ARGS_USER_AGENT, "test").V().iterate(); |
| cluster.close(); |
| |
| final ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class); |
| verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture()); |
| final RequestOptions requestOptions = requestOptionsCaptor.getValue(); |
| assertEquals("test", requestOptions.getUserAgent().get()); |
| |
| final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class); |
| verify(client).submitAsync(requestMessageCaptor.capture()); |
| final RequestMessage requestMessage = requestMessageCaptor.getValue(); |
| assertEquals("test", requestMessage.getArgs().getOrDefault(Tokens.ARGS_USER_AGENT, null)); |
| } |
| |
| @Test |
| public void shouldSendRequestIdBytecode() { |
| final UUID overrideRequestId = UUID.randomUUID(); |
| final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create(); |
| final Client client = Mockito.spy(cluster.connect().alias("g")); |
| Mockito.when(client.alias("g")).thenReturn(client); |
| final GraphTraversalSource g = traversal().withRemote(DriverRemoteConnection.using(client)); |
| g.with(Tokens.REQUEST_ID, overrideRequestId).V().iterate(); |
| cluster.close(); |
| |
| final ArgumentCaptor<RequestOptions> requestOptionsCaptor = ArgumentCaptor.forClass(RequestOptions.class); |
| verify(client).submitAsync(Mockito.any(Bytecode.class), requestOptionsCaptor.capture()); |
| final RequestOptions requestOptions = requestOptionsCaptor.getValue(); |
| assertTrue(requestOptions.getOverrideRequestId().isPresent()); |
| assertEquals(overrideRequestId, requestOptions.getOverrideRequestId().get()); |
| |
| final ArgumentCaptor<RequestMessage> requestMessageCaptor = ArgumentCaptor.forClass(RequestMessage.class); |
| verify(client).submitAsync(requestMessageCaptor.capture()); |
| final RequestMessage requestMessage = requestMessageCaptor.getValue(); |
| assertEquals(overrideRequestId, requestMessage.getRequestId()); |
| } |
| |
| @Test |
| public void shouldClusterReadFileFromResources() throws Exception { |
| final Cluster cluster = Cluster.open(TestClientFactory.RESOURCE_PATH); |
| assertNotNull(cluster); |
| cluster.close(); |
| } |
| |
| @Test |
| public void shouldNotHangWhenSameRequestIdIsUsed() throws Exception { |
| final Cluster cluster = TestClientFactory.build().maxConnectionPoolSize(1).minConnectionPoolSize(1).create(); |
| final Client client = cluster.connect(); |
| final UUID requestId = UUID.randomUUID(); |
| |
| final Future<ResultSet> result1 = client.submitAsync("Thread.sleep(2000);100", |
| RequestOptions.build().overrideRequestId(requestId).create()); |
| |
| // wait for some business to happen on the server |
| Thread.sleep(100); |
| try { |
| // re-use the id and fail |
| client.submit("1+1+97", RequestOptions.build().overrideRequestId(requestId).create()); |
| fail("Request should not have been sent due to duplicate id"); |
| } catch(Exception ex) { |
| // should get a rejection here |
| final Throwable root = ExceptionHelper.getRootCause(ex); |
| assertThat(root.getMessage(), startsWith("There is already a request pending with an id of:")); |
| assertEquals(100, result1.get().one().getInt()); |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| /** |
| * Tests to make sure that the stack trace contains an informative cause when the request times out because the |
| * client was unable to get a connection and the pool is already maxed out. |
| */ |
| @Test |
| public void shouldReturnClearExceptionCauseWhenClientIsTooBusyAndConnectionPoolIsFull() throws InterruptedException { |
| final Cluster cluster = TestClientFactory.build() |
| .minConnectionPoolSize(1) |
| .maxConnectionPoolSize(1) |
| .connectionSetupTimeoutMillis(100) |
| .maxWaitForConnection(150) |
| .minInProcessPerConnection(0) |
| .maxInProcessPerConnection(1) |
| .minSimultaneousUsagePerConnection(0) |
| .maxSimultaneousUsagePerConnection(1) |
| .create(); |
| |
| final Client.ClusteredClient client = cluster.connect(); |
| |
| for (int i = 0; i < 3; i++) { |
| try { |
| client.submitAsync("Thread.sleep(5000);"); |
| } catch (Exception e) { |
| final Throwable root = ExceptionHelper.getRootCause(e); |
| assertTrue(root instanceof TimeoutException); |
| assertTrue(root.getMessage().contains(Client.TOO_MANY_IN_FLIGHT_REQUESTS)); |
| } |
| } |
| |
| cluster.close(); |
| } |
| |
| /** |
| * Client created on an initially dead host should fail initially, and recover after the dead host has restarted |
| * @param testClusterClient - boolean flag set to test clustered client if true and sessioned client if false. |
| */ |
| private void testShouldFailOnInitiallyDeadHost(final boolean testClusterClient) throws Exception { |
| logger.info("Stopping server."); |
| this.stopServer(); |
| |
| final Cluster cluster = TestClientFactory.build().create(); |
| final Client client = testClusterClient? cluster.connect() : cluster.connect("sessionClient"); |
| |
| try { |
| // try to re-issue a request now that the server is down |
| logger.info("Verifying driver cannot connect to server."); |
| client.submit("g").all().get(500, TimeUnit.MILLISECONDS); |
| fail("Should throw an exception."); |
| } catch (Exception re) { |
| // Client would have no active connections to the host, hence it would encounter a timeout |
| // trying to find an alive connection to the host. |
| assertThat(re, instanceOf(NoHostAvailableException.class)); |
| |
| try { |
| client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS); |
| fail("Should throw exception on the retry"); |
| } catch (RuntimeException re2) { |
| if (client instanceof Client.SessionedClient) { |
| assertThat(re2.getCause().getCause(), instanceOf(ConnectionException.class)); |
| } else { |
| assertThat(re2.getCause().getCause().getCause(), instanceOf(ConnectException.class)); |
| } |
| } |
| |
| // |
| // should recover when the server comes back |
| // |
| |
| // restart server |
| logger.info("Restarting server."); |
| this.startServer(); |
| |
| // try a bunch of times to reconnect. on slower systems this may simply take longer...looking at you travis |
| for (int ix = 1; ix < 11; ix++) { |
| // the retry interval is 1 second, wait a bit longer |
| TimeUnit.MILLISECONDS.sleep(1250); |
| |
| try { |
| logger.info(String.format("Connecting driver to server - attempt # %s. ", 1 + ix)); |
| final List<Result> results = client.submit("1+1").all().get(3000, TimeUnit.MILLISECONDS); |
| assertEquals(1, results.size()); |
| assertEquals(2, results.get(0).getInt()); |
| logger.info("Connection successful."); |
| break; |
| } catch (Exception ex) { |
| if (ix == 10) |
| fail("Should have eventually succeeded"); |
| } |
| } |
| } finally { |
| cluster.close(); |
| } |
| } |
| |
| @Test |
| public void shouldFailOnInitiallyDeadHostForClusterClient() throws Exception { |
| testShouldFailOnInitiallyDeadHost(true); |
| } |
| |
| @Test |
| public void shouldFailOnInitiallyDeadHostForSessionClient() throws Exception { |
| testShouldFailOnInitiallyDeadHost(false); |
| } |
| } |