blob: 6977100eee9a9f8c47a4677c72415191b8df4541 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;
import java.io.File;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.driver.simple.NioClient;
import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
import org.apache.tinkerpop.gremlin.driver.simple.WebSocketClient;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.server.channel.NioChannelizer;
import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeThat;
/**
* Integration tests for server-side settings and processing.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegrationTest {
private Log4jRecordingAppender recordingAppender = null;
@Before
public void setupForEachTest() {
recordingAppender = new Log4jRecordingAppender();
final Logger rootLogger = Logger.getRootLogger();
rootLogger.addAppender(recordingAppender);
}
@After
public void teardownForEachTest() {
final Logger rootLogger = Logger.getRootLogger();
rootLogger.removeAppender(recordingAppender);
}
/**
* Configure specific Gremlin Server settings for specific tests.
*/
@Override
public Settings overrideSettings(final Settings settings) {
final String nameOfTest = name.getMethodName();
switch (nameOfTest) {
case "shouldRespectHighWaterMarkSettingAndSucceed":
settings.writeBufferHighWaterMark = 64;
settings.writeBufferLowWaterMark = 32;
break;
case "shouldReceiveFailureTimeOutOnScriptEval":
settings.scriptEvaluationTimeout = 200;
break;
case "shouldReceiveFailureTimeOutOnTotalSerialization":
settings.serializedResponseTimeout = 1;
break;
case "shouldBlockRequestWhenTooBig":
settings.maxContentLength = 1024;
break;
case "shouldBatchResultsByTwos":
settings.resultIterationBatchSize = 2;
break;
case "shouldWorkOverNioTransport":
settings.channelizer = NioChannelizer.class.getName();
break;
case "shouldEnableSsl":
case "shouldEnableSslButFailIfClientConnectsWithoutIt":
settings.ssl = new Settings.SslSettings();
settings.ssl.enabled = true;
break;
case "shouldStartWithDefaultSettings":
return new Settings();
case "shouldHaveTheSessionTimeout":
settings.processors.clear();
final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
processorSettings.className = SessionOpProcessor.class.getCanonicalName();
processorSettings.config = new HashMap<>();
processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000l);
settings.processors.add(processorSettings);
break;
case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
deleteDirectory(new File("/tmp/neo4j"));
settings.graphs.put("graph", "conf/neo4j-empty.properties");
break;
}
return settings;
}
@Test
public void shouldStartWithDefaultSettings() {
// just quickly validate that results are returning given defaults. no graphs are config'd with defaults
// so just eval a groovy script.
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
final ResultSet results = client.submit("[1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
cluster.close();
}
@Test
public void shouldEnableSsl() {
final Cluster cluster = Cluster.build().enableSsl(true).create();
final Client client = cluster.connect();
try {
// this should return "nothing" - there should be no exception
assertEquals("test", client.submit("'test'").one().getString());
} finally {
cluster.close();
}
}
@org.junit.Ignore("This test hangs - not sure why")
@Test
public void shouldEnableSslButFailIfClientConnectsWithoutIt() {
// todo: need to get this to pass somehow - should just error out.
final Cluster cluster = Cluster.build().enableSsl(false).create();
final Client client = cluster.connect();
try {
// this should return "nothing" - there should be no exception
assertEquals("test", client.submit("'test'").one().getString());
} catch(Exception x) {
x.printStackTrace();
} finally {
cluster.close();
}
}
@Test
public void shouldRespectHighWaterMarkSettingAndSucceed() throws Exception {
// the highwatermark should get exceeded on the server and thus pause the writes, but have no problem catching
// itself up - this is a tricky tests to get passing on all environments so this assumption will deny the
// test for most cases
assumeThat("Set the 'assertNonDeterministic' property to true to execute this test",
System.getProperty("assertNonDeterministic"), is("true"));
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
final int resultCountToGenerate = 1000;
final int batchSize = 3;
final String fatty = IntStream.range(0, 175).mapToObj(String::valueOf).collect(Collectors.joining());
final String fattyX = "['" + fatty + "'] * " + resultCountToGenerate;
// don't allow the thread to proceed until all results are accounted for
final CountDownLatch latch = new CountDownLatch(resultCountToGenerate);
final AtomicBoolean expected = new AtomicBoolean(false);
final AtomicBoolean faulty = new AtomicBoolean(false);
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
.addArg(Tokens.ARGS_GREMLIN, fattyX).create();
client.submitAsync(request).thenAcceptAsync(r -> {
r.stream().forEach(item -> {
try {
final String aFattyResult = item.getString();
expected.set(aFattyResult.equals(fatty));
} catch (Exception ex) {
ex.printStackTrace();
faulty.set(true);
} finally {
latch.countDown();
}
});
});
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
assertEquals(0, latch.getCount());
assertFalse(faulty.get());
assertTrue(expected.get());
assertTrue(recordingAppender.getMessages().stream().anyMatch(m -> m.contains("Pausing response writing as writeBufferHighWaterMark exceeded on")));
} catch (Exception ex) {
fail("Shouldn't have tossed an exception");
} finally {
cluster.close();
}
}
@Test
public void shouldReturnInvalidRequestArgsWhenGremlinArgIsNotSupplied() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
latch.countDown();
}
});
if (!latch.await(300, TimeUnit.MILLISECONDS))
fail("Request should have returned error, but instead timed out");
assertTrue(pass.get());
}
}
@Test
public void shouldReturnInvalidRequestArgsWhenInvalidBindingKeyIsUsed() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final Map<String, Object> bindings = new HashMap<>();
bindings.put(T.id.getAccessor(), "123");
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
.addArg(Tokens.ARGS_BINDINGS, bindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
latch.countDown();
}
});
if (!latch.await(3000, TimeUnit.MILLISECONDS))
fail("Request should have returned error, but instead timed out");
assertTrue(pass.get());
}
}
@Test
public void shouldBatchResultsByTwos() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]").create();
// set the latch to six as there should be six responses when you include the terminator
final CountDownLatch latch = new CountDownLatch(5);
client.submit(request, r -> latch.countDown());
assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
}
}
@Test
public void shouldBatchResultsByOnesByOverridingFromClientSide() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
.addArg(Tokens.ARGS_BATCH_SIZE, 1).create();
// should be 11 responses when you include the terminator
final CountDownLatch latch = new CountDownLatch(10);
client.submit(request, r -> latch.countDown());
assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
}
}
@Test
public void shouldWorkOverNioTransport() throws Exception {
try (SimpleClient client = new NioClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]").create();
// should be 2 responses when you include the terminator
final CountDownLatch latch = new CountDownLatch(1);
client.submit(request, r -> latch.countDown());
assertTrue(latch.await(30000, TimeUnit.MILLISECONDS));
}
}
@Test
public void shouldNotThrowNoSuchElementException() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
// this should return "nothing" - there should be no exception
assertNull(client.submit("g.V().has('name','kadfjaldjfla')").one());
} catch (Exception ex) {
ex.printStackTrace();
} finally {
cluster.close();
}
}
@Test
public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
client.submit("Thread.sleep(3000);'some-stuff-that-should not return'").all().join();
fail("Should throw an exception.");
} catch (RuntimeException re) {
assertTrue(ExceptionUtils.getRootCause(re).getMessage().startsWith("Script evaluation exceeded the configured threshold of 200 ms for request"));
} finally {
cluster.close();
}
}
@Test
public void shouldReceiveFailureTimeOutOnTotalSerialization() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
client.submit("(0..<100000)").all().join();
fail("Should throw an exception.");
} catch (RuntimeException re) {
assertTrue(re.getCause().getMessage().endsWith("Serialization of the entire response exceeded the serializeResponseTimeout setting"));
} finally {
cluster.close();
}
}
@Test
public void shouldLoadInitScript() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
assertEquals(2, client.submit("addItUp(1,1)").all().join().get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldGarbageCollectPhantomButNotHard() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
assertEquals(2, client.submit("addItUp(1,1)").all().join().get(0).getInt());
assertEquals(0, client.submit("def subtract(x,y){x-y};subtract(1,1)").all().join().get(0).getInt());
assertEquals(0, client.submit("subtract(1,1)").all().join().get(0).getInt());
final Map<String, Object> bindings = new HashMap<>();
bindings.put(GremlinGroovyScriptEngine.KEY_REFERENCE_TYPE, GremlinGroovyScriptEngine.REFERENCE_TYPE_PHANTOM);
assertEquals(4, client.submit("def multiply(x,y){x*y};multiply(2,2)", bindings).all().join().get(0).getInt());
try {
client.submit("multiply(2,2)").all().join().get(0).getInt();
fail("Should throw an exception since reference is phantom.");
} catch (RuntimeException ignored) {
} finally {
cluster.close();
}
}
@Test
public void shouldReceiveFailureOnBadGraphSONSerialization() throws Exception {
final Cluster cluster = Cluster.build("localhost").serializer(Serializers.GRAPHSON_V1D0).create();
final Client client = cluster.connect();
try {
client.submit("def class C { def C getC(){return this}}; new C()").all().join();
fail("Should throw an exception.");
} catch (RuntimeException re) {
assertTrue(re.getCause().getCause().getMessage().startsWith("Error during serialization: Direct self-reference leading to cycle (through reference chain:"));
} finally {
cluster.close();
}
}
@Test
public void shouldReceiveFailureOnBadGryoSerialization() throws Exception {
final Cluster cluster = Cluster.build("localhost").serializer(Serializers.GRYO_V1D0).create();
final Client client = cluster.connect();
try {
client.submit("java.awt.Color.RED").all().join();
fail("Should throw an exception.");
} catch (RuntimeException re) {
assertTrue(re.getCause().getCause().getMessage().startsWith("Error during serialization: Class is not registered: java.awt.Color"));
} finally {
cluster.close();
}
}
@Test
public void shouldBlockRequestWhenTooBig() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
final String fatty = IntStream.range(0, 1024).mapToObj(String::valueOf).collect(Collectors.joining());
final CompletableFuture<ResultSet> result = client.submitAsync("'" + fatty + "';'test'");
final ResultSet resultSet = result.get(10000, TimeUnit.MILLISECONDS);
resultSet.all().get(10000, TimeUnit.MILLISECONDS);
fail("Should throw an exception.");
} catch (TimeoutException te) {
// the request should not have timed-out - the connection should have been reset, but it seems that
// timeout seems to occur as well on some systems (it's not clear why). however, the nature of this
// test is to ensure that the script isn't processed if it exceeds a certain size, so in this sense
// it seems ok to pass in this case.
} catch (Exception re) {
final Throwable root = ExceptionUtils.getRootCause(re);
assertEquals("Connection reset by peer", root.getMessage());
} finally {
cluster.close();
}
}
@Test
public void shouldFailOnDeadHost() throws Exception {
final Cluster cluster = Cluster.build("localhost").create();
final Client client = cluster.connect();
// ensure that connection to server is good
assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
// kill the server which will make the client mark the host as unavailable
this.stopServer();
try {
// try to re-issue a request now that the server is down
client.submit("1+1").all().join();
fail();
} catch (RuntimeException re) {
assertTrue(re.getCause().getCause() instanceof ClosedChannelException);
} finally {
cluster.close();
}
}
@Test
public void shouldHaveTheSessionTimeout() throws Exception {
final Cluster cluster = Cluster.build().create();
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
// session times out in 3 seconds
Thread.sleep(3500);
try {
client.submit("x[1]+2").all().get();
fail("Session should be dead");
} catch (Exception ex) {
final Exception cause = (Exception) ex.getCause().getCause();
assertTrue(cause instanceof ResponseException);
assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, ((ResponseException) cause).getResponseStatusCode());
} finally {
cluster.close();
}
}
@Test
public void shouldEvalAndReturnSuccessOnlyNoPartialContent() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "10").create();
// set the latch to two as there should be two responses when you include the terminator -
// the error and the terminator
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger messages = new AtomicInteger(0);
final AtomicBoolean errorReceived = new AtomicBoolean(false);
client.submit(request, r -> {
errorReceived.set(r.getStatus().equals(ResponseStatusCode.SUCCESS));
latch.countDown();
messages.incrementAndGet();
});
assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
// make sure no extra message sneak in
Thread.sleep(1000);
assertEquals(1, messages.get());
}
}
@Test
public void shouldFailWithBadScriptEval() throws Exception {
try (SimpleClient client = new WebSocketClient()) {
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "new String().doNothingAtAllBecauseThis is a syntax error").create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger messages = new AtomicInteger(0);
final AtomicBoolean errorReceived = new AtomicBoolean(false);
client.submit(request, r -> {
errorReceived.set(r.getStatus().equals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION));
latch.countDown();
messages.incrementAndGet();
});
assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
// make sure no extra message sneak in
Thread.sleep(1000);
assertEquals(1, messages.get());
}
}
// todo: get this test to pass - count connection and block incoming requests.
@Test
@org.junit.Ignore
public void shouldBlockWhenMaxConnectionsExceeded() throws Exception {
final Cluster cluster = Cluster.open();
final Client client = cluster.connect();
try {
final CompletableFuture<ResultSet> result = client.submitAsync("Thread.sleep(500);'test'");
try {
// this request should get blocked by the server
client.submitAsync("'test-blocked'").join().one();
fail("Request should fail because max connections are exceeded");
} catch (Exception ex) {
assertTrue(true);
ex.printStackTrace();
}
assertEquals("test", result.get().one().getString());
} catch (Exception re) {
fail("Should not have an exception here");
} finally {
cluster.close();
}
}
@Test
public void shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient() throws Exception {
assumeNeo4jIsPresent();
final SimpleClient client = new WebSocketClient();
//open a transaction, create a vertex, commit
final CountDownLatch latch = new CountDownLatch(1);
final RequestMessage OpenRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().open()")
.create();
client.submit(OpenRequest, (r) -> {
latch.countDown();
});
assertTrue(latch.await(1500, TimeUnit.MILLISECONDS));
final CountDownLatch latch2 = new CountDownLatch(1);
final RequestMessage AddRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "v=graph.addVertex(\"name\",\"stephen\")")
.create();
client.submit(AddRequest, (r) -> {
latch2.countDown();
});
assertTrue(latch2.await(1500, TimeUnit.MILLISECONDS));
final CountDownLatch latch3 = new CountDownLatch(1);
final RequestMessage CommitRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().commit()")
.create();
client.submit(CommitRequest, (r) -> {
latch3.countDown();
});
assertTrue(latch3.await(1500, TimeUnit.MILLISECONDS));
// Check to see if the transaction is closed.
final CountDownLatch latch4 = new CountDownLatch(1);
final AtomicBoolean isOpen = new AtomicBoolean(false);
final RequestMessage CheckRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
client.submit(CheckRequest, (r) -> {
ArrayList<Boolean> response = (ArrayList) r.getResult().getData();
isOpen.set(response.get(0));
latch4.countDown();
});
assertTrue(latch4.await(1500, TimeUnit.MILLISECONDS));
// make sure no extra message sneak in
Thread.sleep(1000);
assertTrue("Transaction should be closed", !isOpen.get());
//lets run a sessionless read
final CountDownLatch latch5 = new CountDownLatch(1);
final RequestMessage sessionlessRequest = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "graph.traversal().V()")
.create();
client.submit(sessionlessRequest, (r) -> {
latch5.countDown();
});
assertTrue(latch5.await(1500, TimeUnit.MILLISECONDS));
// Check to see if the transaction is still closed.
final CountDownLatch latch6 = new CountDownLatch(1);
final AtomicBoolean isStillOpen = new AtomicBoolean(false);
final RequestMessage CheckAgainRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
client.submit(CheckAgainRequest, (r) -> {
ArrayList<Boolean> response = (ArrayList) r.getResult().getData();
isStillOpen.set(response.get(0));
latch6.countDown();
});
assertTrue(latch6.await(1500, TimeUnit.MILLISECONDS));
// make sure no extra message sneak in
Thread.sleep(1000);
assertTrue("Transaction should still be closed", !isStillOpen.get());
}
@Test
public void shouldStillSupportDeprecatedRebindingsParameterOnServer() throws Exception {
// this test can be removed when the rebindings arg is removed
try (SimpleClient client = new WebSocketClient()) {
final Map<String,String> rebindings = new HashMap<>();
rebindings.put("xyz", "graph");
final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "xyz.addVertex('name','jason')")
.addArg(Tokens.ARGS_REBINDINGS, rebindings).create();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean pass = new AtomicBoolean(false);
client.submit(request, result -> {
final List<Object> results = (List<Object>) result.getResult().getData();
final DetachedVertex v = (DetachedVertex) results.get(0);
pass.set(ResponseStatusCode.SUCCESS == result.getStatus().getCode() && v.value("name").equals("jason"));
latch.countDown();
});
if (!latch.await(300, TimeUnit.MILLISECONDS)) fail("Request should have returned a response");
assertTrue(pass.get());
}
}
}