/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.tinkerpop.gremlin.server;

import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tinkerpop.gremlin.TestHelper;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection;
import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteTraversalSideEffects;
import org.apache.tinkerpop.gremlin.driver.ser.Serializers;
import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
import org.apache.tinkerpop.gremlin.groovy.jsr223.customizer.SimpleSandboxExtension;
import org.apache.tinkerpop.gremlin.jsr223.ScriptFileGremlinPlugin;
import org.apache.tinkerpop.gremlin.structure.RemoteGraph;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler;
import org.apache.tinkerpop.gremlin.server.op.AbstractEvalOpProcessor;
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.T;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.apache.tinkerpop.gremlin.util.function.Lambda;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.tinkerpop.gremlin.driver.Tokens.ARGS_SCRIPT_EVAL_TIMEOUT;
import static org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin.Compilation.COMPILE_STATIC;
import static org.apache.tinkerpop.gremlin.process.remote.RemoteConnection.GREMLIN_REMOTE;
import static org.apache.tinkerpop.gremlin.process.remote.RemoteConnection.GREMLIN_REMOTE_CONNECTION_CLASS;
import static org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource.traversal;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
 * Integration tests for server-side settings and processing.
 *
 * @author Stephen Mallette (http://stephen.genoprime.com)
 */
public class GremlinServerIntegrateTest extends AbstractGremlinServerIntegrationTest {
    private Level previousLogLevel;

    private Log4jRecordingAppender recordingAppender = null;
    private final Supplier<Graph> graphGetter = () -> server.getServerGremlinExecutor().getGraphManager().getGraph("graph");
    private final Configuration conf = new BaseConfiguration() {{
        setProperty(Graph.GRAPH, RemoteGraph.class.getName());
        setProperty(GREMLIN_REMOTE_CONNECTION_CLASS, DriverRemoteConnection.class.getName());
        setProperty(DriverRemoteConnection.GREMLIN_REMOTE_DRIVER_SOURCENAME, "g");
        setProperty(GREMLIN_REMOTE + "attachment", graphGetter);
        setProperty("clusterConfiguration.port", TestClientFactory.PORT);
        setProperty("clusterConfiguration.hosts", "localhost");
    }};

    @Before
    public void setupForEachTest() {
        recordingAppender = new Log4jRecordingAppender();
        final Logger rootLogger = Logger.getRootLogger();

        if (name.getMethodName().equals("shouldPingChannelIfClientDies") ||
                name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
            previousLogLevel = webSocketClientHandlerLogger.getLevel();
            webSocketClientHandlerLogger.setLevel(Level.INFO);
        }

        rootLogger.addAppender(recordingAppender);
    }

    @After
    public void teardownForEachTest() {
        final Logger rootLogger = Logger.getRootLogger();

        if (name.getMethodName().equals("shouldPingChannelIfClientDies")||
                name.getMethodName().equals("shouldCloseChannelIfClientDoesntRespond")) {
            final org.apache.log4j.Logger webSocketClientHandlerLogger = org.apache.log4j.Logger.getLogger(OpSelectorHandler.class);
            webSocketClientHandlerLogger.setLevel(previousLogLevel);
        }

        rootLogger.removeAppender(recordingAppender);
    }

    /**
     * Configure specific Gremlin Server settings for specific tests.
     */
    @Override
    public Settings overrideSettings(final Settings settings) {
        final String nameOfTest = name.getMethodName();
        switch (nameOfTest) {
            case "shouldProvideBetterExceptionForMethodCodeTooLarge":
                settings.maxContentLength = 4096000;
                final Settings.ProcessorSettings processorSettingsBig = new Settings.ProcessorSettings();
                processorSettingsBig.className = StandardOpProcessor.class.getName();
                processorSettingsBig.config = new HashMap<String,Object>() {{
                    put(AbstractEvalOpProcessor.CONFIG_MAX_PARAMETERS, Integer.MAX_VALUE);
                }};
                settings.processors.clear();
                settings.processors.add(processorSettingsBig);
                break;
            case "shouldRespectHighWaterMarkSettingAndSucceed":
                settings.writeBufferHighWaterMark = 64;
                settings.writeBufferLowWaterMark = 32;
                break;
            case "shouldReceiveFailureTimeOutOnScriptEval":
                settings.evaluationTimeout = 1000;
                break;
            case "shouldBlockRequestWhenTooBig":
                settings.maxContentLength = 1024;
                break;
            case "shouldBatchResultsByTwos":
                settings.resultIterationBatchSize = 2;
                break;
            case "shouldUseSimpleSandbox":
                settings.scriptEngines.get("gremlin-groovy").plugins.put(GroovyCompilerGremlinPlugin.class.getName(), getScriptEngineConfForSimpleSandbox());
                // remove the script because it isn't used in the test but also because it's not CompileStatic ready
                settings.scriptEngines.get("gremlin-groovy").plugins.remove(ScriptFileGremlinPlugin.class.getName());
                break;
            case "shouldUseInterpreterMode":
                settings.scriptEngines.get("gremlin-groovy").plugins.put(GroovyCompilerGremlinPlugin.class.getName(), getScriptEngineConfForInterpreterMode());
                break;
            case "shouldReceiveFailureTimeOutOnScriptEvalOfOutOfControlLoop":
                settings.scriptEngines.get("gremlin-groovy").plugins.put(GroovyCompilerGremlinPlugin.class.getName(), getScriptEngineConfForTimedInterrupt());
                break;
            case "shouldUseBaseScript":
                settings.scriptEngines.get("gremlin-groovy").plugins.put(GroovyCompilerGremlinPlugin.class.getName(), getScriptEngineConfForBaseScript());
                settings.scriptEngines.get("gremlin-groovy").config = getScriptEngineConfForBaseScript();
                break;
            case "shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable":
                final Settings.ProcessorSettings processorSettingsSmall = new Settings.ProcessorSettings();
                processorSettingsSmall.className = StandardOpProcessor.class.getName();
                processorSettingsSmall.config = new HashMap<String,Object>() {{
                    put(AbstractEvalOpProcessor.CONFIG_MAX_PARAMETERS, 1);
                }};
                settings.processors.clear();
                settings.processors.add(processorSettingsSmall);
                break;
            case "shouldTimeOutRemoteTraversal":
                settings.evaluationTimeout = 500;
                break;
            case "shouldTimeOutRemoteTraversalUsingDeprecatedConfiguration":
                settings.scriptEvaluationTimeout = 500;
                break;
            case "shouldPingChannelIfClientDies":
                settings.keepAliveInterval = 1000;
                break;
            case "shouldCloseChannelIfClientDoesntRespond":
                settings.idleConnectionTimeout = 1000;
                break;
            default:
                break;
        }

        return settings;
    }

    private static Map<String, Object> getScriptEngineConfForSimpleSandbox() {
        final Map<String,Object> scriptEngineConf = new HashMap<>();
        scriptEngineConf.put("compilation", COMPILE_STATIC.name());
        scriptEngineConf.put("extensions", SimpleSandboxExtension.class.getName());
        return scriptEngineConf;
    }

    private static Map<String, Object> getScriptEngineConfForTimedInterrupt() {
        final Map<String,Object> scriptEngineConf = new HashMap<>();
        scriptEngineConf.put("timedInterrupt", 1000);
        return scriptEngineConf;
    }

    private static Map<String, Object> getScriptEngineConfForInterpreterMode() {
        final Map<String,Object> scriptEngineConf = new HashMap<>();
        scriptEngineConf.put("enableInterpreterMode", true);
        return scriptEngineConf;
    }

    private static Map<String, Object> getScriptEngineConfForBaseScript() {
        final Map<String,Object> scriptEngineConf = new HashMap<>();
        final Map<String,Object> properties = new HashMap<>();
        properties.put("ScriptBaseClass", BaseScriptForTesting.class.getName());
        scriptEngineConf.put("compilerConfigurationOptions", properties);
        return scriptEngineConf;
    }

    @Test
    public void shouldScriptEvaluationErrorForRemoteTraversal() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);

        try {
            // tests bad lambda
            g.inject(1).sideEffect(Lambda.consumer("(")).iterate();
            fail("This traversal should not have executed since lambda can't be compiled");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, ((ResponseException) t).getResponseStatusCode());
        }

        // make a graph with a cycle in it to force a long run traversal
        graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();

        try {
            // tests an "unending" traversal
            g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }
    }

    @Test
    public void shouldCloseChannelIfClientDoesntRespond() throws Exception {
        final SimpleClient client = TestClientFactory.createWebSocketClient();
        client.submit("1+1");

        // since we do nothing for 2 seconds and the time limit for timeout on the server is 1 second, the server
        // will autoclose the channel
        Thread.sleep(2000);

        assertThat(recordingAppender.logContainsAny(".*Closing channel - client is disconnected after idle period of .*$"), is(true));

        client.close();
    }

    @Test
    public void shouldPingChannelIfClientDies() throws Exception {
        final Client client = TestClientFactory.build().maxConnectionPoolSize(1).minConnectionPoolSize(1).keepAliveInterval(0).create().connect();
        client.submit("1+1").all().get();

        // since we do nothing for 3 seconds and the time limit for ping is 1 second we should get *about* 3 pings -
        // i don't think the assertion needs to be too accurate. just need to make sure there's a ping message out
        // there record
        Thread.sleep(3000);

        client.close();

        // stop the server to be sure that logs flush
        stopServer();

        assertThat(recordingAppender.logContainsAny(".*Checking channel - sending ping to client after idle period of .*$"), is(true));
    }

    @Test
    public void shouldTimeOutRemoteTraversal() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);

        try {
            // tests sleeping thread
            g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }

        // make a graph with a cycle in it to force a long run traversal
        graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();

        try {
            // tests an "unending" traversal
            g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }
    }

    @Test
    public void shouldTimeOutRemoteTraversalUsingDeprecatedConfiguration() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);

        try {
            // tests sleeping thread
            g.inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }

        // make a graph with a cycle in it to force a long run traversal
        graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();

        try {
            // tests an "unending" traversal
            g.V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }
    }

    @Test
    public void shouldTimeOutRemoteTraversalWithPerRequestOption() {
        final GraphTraversalSource g = traversal().withRemote(conf);

        try {
            // tests sleeping thread
            g.with(ARGS_SCRIPT_EVAL_TIMEOUT, 500L).inject(1).sideEffect(Lambda.consumer("Thread.sleep(10000)")).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }

        // make a graph with a cycle in it to force a long run traversal
        graphGetter.get().traversal().addV("person").as("p").addE("self").to("p").iterate();

        try {
            // tests an "unending" traversal
            g.with(ARGS_SCRIPT_EVAL_TIMEOUT, 500L).V().repeat(__.out()).until(__.outE().count().is(0)).iterate();
            fail("This traversal should have timed out");
        } catch (Exception ex) {
            final Throwable t = ex.getCause();
            assertThat(t, instanceOf(ResponseException.class));
            assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
        }
    }

    @Test
    public void shouldProduceProperExceptionOnTimeout() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect(name.getMethodName());

        boolean success = false;
        // Run a short test script a few times with progressively longer timeouts.
        // Each submissions should either succeed or fail with a timeout.
        // Note: the range of timeouts is intended to cover the case when the script finishes at about the
        // same time when the timeout occurs. In this situation either a timeout response or a successful
        // response is acceptable, however no other processing errors should occur.
        // Note: the timeout of 30 ms is generally sufficient for running a simple groovy script, so using longer
        // timeouts are not likely to results in a success/timeout response collision, which is the purpose
        // of this test.
        // Note: this test may have a false negative result, but a failure  would indicate a real problem.
        for(int i = 0; i < 30; i++) {
            int timeout = 1 + i;
            overrideEvaluationTimeout(timeout);

            try {
                client.submit("x = 1 + 1").all().get().get(0).getInt();
                success = true;
            } catch (Exception ex) {
                final Throwable t = ex.getCause();
                assertThat("Unexpected exception with script evaluation timeout: " + timeout, t, instanceOf(ResponseException.class));
                assertEquals(ResponseStatusCode.SERVER_ERROR_TIMEOUT, ((ResponseException) t).getResponseStatusCode());
            }
        }

        assertTrue("Some script submissions should succeed", success);

        cluster.close();
    }

    @Test
    public void shouldUseBaseScript() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect(name.getMethodName());

        assertEquals("hello, stephen", client.submit("hello('stephen')").all().get().get(0).getString());

        cluster.close();
    }

    @Test
    public void shouldUseInterpreterMode() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect(name.getMethodName());

        client.submit("def subtractAway(x,y){x-y};[]").all().get();
        client.submit("multiplyIt = { x,y -> x * y};[]").all().get();

        assertEquals(2, client.submit("x = 1 + 1").all().get().get(0).getInt());
        assertEquals(3, client.submit("int y = x + 1").all().get().get(0).getInt());
        assertEquals(5, client.submit("def z = x + y").all().get().get(0).getInt());

        final Map<String,Object> m = new HashMap<>();
        m.put("x", 10);
        assertEquals(-5, client.submit("z - x", m).all().get().get(0).getInt());
        assertEquals(15, client.submit("addItUp(x,z)", m).all().get().get(0).getInt());
        assertEquals(5, client.submit("subtractAway(x,z)", m).all().get().get(0).getInt());
        assertEquals(50, client.submit("multiplyIt(x,z)", m).all().get().get(0).getInt());

        cluster.close();
    }

    @Test
    public void shouldNotUseInterpreterMode() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect(name.getMethodName());

        client.submit("def subtractAway(x,y){x-y};[]").all().get();
        client.submit("multiplyIt = { x,y -> x * y};[]").all().get();

        assertEquals(2, client.submit("x = 1 + 1").all().get().get(0).getInt());
        assertEquals(3, client.submit("y = x + 1").all().get().get(0).getInt());
        assertEquals(5, client.submit("z = x + y").all().get().get(0).getInt());

        final Map<String,Object> m = new HashMap<>();
        m.put("x", 10);
        assertEquals(-5, client.submit("z - x", m).all().get().get(0).getInt());
        assertEquals(15, client.submit("addItUp(x,z)", m).all().get().get(0).getInt());
        assertEquals(5, client.submit("subtractAway(x,z)", m).all().get().get(0).getInt());
        assertEquals(50, client.submit("multiplyIt(x,z)", m).all().get().get(0).getInt());

        cluster.close();
    }

    @Test
    public void shouldUseSimpleSandbox() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect();

        assertEquals(2, client.submit("1+1").all().get().get(0).getInt());

        try {
            // this should return "nothing" - there should be no exception
            client.submit("java.lang.System.exit(0)").all().get();
            fail("The above should not have executed in any successful way as sandboxing is enabled");
        } catch (Exception ex) {
            assertThat(ex.getCause().getMessage(), containsString("[Static type checking] - Not authorized to call this method: java.lang.System#exit(int)"));
        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldRespectHighWaterMarkSettingAndSucceed() throws Exception {
        // the highwatermark should get exceeded on the server and thus pause the writes, but have no problem catching
        // itself up - this is a tricky tests to get passing on all environments so this assumption will deny the
        // test for most cases
        TestHelper.assumeNonDeterministic();

        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect();

        try {
            final int resultCountToGenerate = 1000;
            final int batchSize = 3;
            final String fatty = IntStream.range(0, 175).mapToObj(String::valueOf).collect(Collectors.joining());
            final String fattyX = "['" + fatty + "'] * " + resultCountToGenerate;

            // don't allow the thread to proceed until all results are accounted for
            final CountDownLatch latch = new CountDownLatch(resultCountToGenerate);
            final AtomicBoolean expected = new AtomicBoolean(false);
            final AtomicBoolean faulty = new AtomicBoolean(false);
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_BATCH_SIZE, batchSize)
                    .addArg(Tokens.ARGS_GREMLIN, fattyX).create();

            client.submitAsync(request).thenAcceptAsync(r -> {
                r.stream().forEach(item -> {
                    try {
                        final String aFattyResult = item.getString();
                        expected.set(aFattyResult.equals(fatty));
                    } catch (Exception ex) {
                        ex.printStackTrace();
                        faulty.set(true);
                    } finally {
                        latch.countDown();
                    }
                });
            });

            assertThat(latch.await(30000, TimeUnit.MILLISECONDS), is(true));
            assertEquals(0, latch.getCount());
            assertThat(faulty.get(), is(false));
            assertThat(expected.get(), is(true));

            assertThat(recordingAppender.getMessages().stream().anyMatch(m -> m.contains("Pausing response writing as writeBufferHighWaterMark exceeded on")), is(true));
        } catch (Exception ex) {
            fail("Shouldn't have tossed an exception");
        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldReturnInvalidRequestArgsWhenGremlinArgIsNotSupplied() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL).create();
            final ResponseMessage result = client.submit(request).get(0);
            assertThat(result.getStatus().getCode(), is(not(ResponseStatusCode.PARTIAL_CONTENT)));
            assertEquals(result.getStatus().getCode(), ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS);
        }
    }

    @Test
    public void shouldReturnInvalidRequestArgsWhenInvalidReservedBindingKeyIsUsed() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<String, Object> bindings = new HashMap<>();
            bindings.put(T.id.getAccessor(), "123");
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertThat(pass.get(), is(true));
        }

        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<String, Object> bindings = new HashMap<>();
            bindings.put("id", "123");
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertTrue(pass.get());
        }
    }

    @Test
    public void shouldReturnInvalidRequestArgsWhenInvalidTypeBindingKeyIsUsed() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<Object, Object> bindings = new HashMap<>();
            bindings.put(1, "123");
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertThat(pass.get(), is(true));
        }
    }

    @Test
    public void shouldReturnInvalidRequestArgsWhenBindingCountExceedsAllowable() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<Object, Object> bindings = new HashMap<>();
            bindings.put("x", 123);
            bindings.put("y", 123);
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "x+y")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertThat(pass.get(), is(true));
        }

        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<Object, Object> bindings = new HashMap<>();
            bindings.put("x", 123);
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "x+123")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.SUCCESS == result.getStatus().getCode() && (((int) ((List) result.getResult().getData()).get(0) == 246)));
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertThat(pass.get(), is(true));
        }
    }

    @Test
    public void shouldReturnInvalidRequestArgsWhenInvalidNullBindingKeyIsUsed() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final Map<String, Object> bindings = new HashMap<>();
            bindings.put(null, "123");
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[1,2,3,4,5,6,7,8,9,0]")
                    .addArg(Tokens.ARGS_BINDINGS, bindings).create();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean pass = new AtomicBoolean(false);
            client.submit(request, result -> {
                if (result.getStatus().getCode() != ResponseStatusCode.PARTIAL_CONTENT) {
                    pass.set(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS == result.getStatus().getCode());
                    latch.countDown();
                }
            });

            if (!latch.await(3000, TimeUnit.MILLISECONDS))
                fail("Request should have returned error, but instead timed out");
            assertThat(pass.get(), is(true));
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldBatchResultsByTwos() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[0,1,2,3,4,5,6,7,8,9]").create();

            final List<ResponseMessage> msgs = client.submit(request);
            assertEquals(5, client.submit(request).size());
            assertEquals(0, ((List<Integer>) msgs.get(0).getResult().getData()).get(0).intValue());
            assertEquals(1, ((List<Integer>) msgs.get(0).getResult().getData()).get(1).intValue());
            assertEquals(2, ((List<Integer>) msgs.get(1).getResult().getData()).get(0).intValue());
            assertEquals(3, ((List<Integer>) msgs.get(1).getResult().getData()).get(1).intValue());
            assertEquals(4, ((List<Integer>) msgs.get(2).getResult().getData()).get(0).intValue());
            assertEquals(5, ((List<Integer>) msgs.get(2).getResult().getData()).get(1).intValue());
            assertEquals(6, ((List<Integer>) msgs.get(3).getResult().getData()).get(0).intValue());
            assertEquals(7, ((List<Integer>) msgs.get(3).getResult().getData()).get(1).intValue());
            assertEquals(8, ((List<Integer>) msgs.get(4).getResult().getData()).get(0).intValue());
            assertEquals(9, ((List<Integer>) msgs.get(4).getResult().getData()).get(1).intValue());
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldBatchResultsByOnesByOverridingFromClientSide() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "[0,1,2,3,4,5,6,7,8,9]")
                    .addArg(Tokens.ARGS_BATCH_SIZE, 1).create();

            final List<ResponseMessage> msgs = client.submit(request);
            assertEquals(10, msgs.size());
            IntStream.rangeClosed(0, 9).forEach(i -> assertEquals(i, ((List<Integer>) msgs.get(i).getResult().getData()).get(0).intValue()));
        }
    }

    @Test
    public void shouldNotThrowNoSuchElementException() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()){
            // this should return "nothing" - there should be no exception
            final List<ResponseMessage> responses = client.submit("g.V().has('name','kadfjaldjfla')");
            assertNull(responses.get(0).getResult().getData());
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldReceiveFailureTimeOutOnScriptEval() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()){
            final List<ResponseMessage> responses = client.submit("Thread.sleep(3000);'some-stuff-that-should not return'");
            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 1000 ms"));

            // validate that we can still send messages to the server
            assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldReceiveFailureTimeOutOnScriptEvalUsingOverride() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage msg = RequestMessage.build("eval")
                    .addArg(ARGS_SCRIPT_EVAL_TIMEOUT, 100L)
                    .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
                    .create();
            final List<ResponseMessage> responses = client.submit(msg);
            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));

            // validate that we can still send messages to the server
            assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldReceiveFailureTimeOutOnEvalUsingOverride() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage msg = RequestMessage.build("eval")
                    .addArg(Tokens.ARGS_EVAL_TIMEOUT, 100L)
                    .addArg(Tokens.ARGS_GREMLIN, "Thread.sleep(3000);'some-stuff-that-should not return'")
                    .create();
            final List<ResponseMessage> responses = client.submit(msg);
            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Evaluation exceeded the configured 'evaluationTimeout' threshold of 100 ms"));

            // validate that we can still send messages to the server
            assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
        }
    }

    @Test
    public void shouldReceiveFailureTimeOutOnScriptEvalOfOutOfControlLoop() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()){
            // timeout configured for 1 second so the timed interrupt should trigger prior to the
            // evaluationTimeout which is at 30 seconds by default
            final List<ResponseMessage> responses = client.submit("while(true){}");
            assertThat(responses.get(0).getStatus().getMessage(), startsWith("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider"));

            // validate that we can still send messages to the server
            assertEquals(2, ((List<Integer>) client.submit("1+1").get(0).getResult().getData()).get(0).intValue());
        }
    }

    @Test
    @SuppressWarnings("unchecked")
    public void shouldLoadInitScript() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()){
            assertEquals(2, ((List<Integer>) client.submit("addItUp(1,1)").get(0).getResult().getData()).get(0).intValue());
        }
    }

    @Test
    public void shouldGarbageCollectPhantomButNotHard() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect();

        assertEquals(2, client.submit("addItUp(1,1)").all().join().get(0).getInt());
        assertEquals(0, client.submit("def subtract(x,y){x-y};subtract(1,1)").all().join().get(0).getInt());
        assertEquals(0, client.submit("subtract(1,1)").all().join().get(0).getInt());

        final Map<String, Object> bindings = new HashMap<>();
        bindings.put(GremlinGroovyScriptEngine.KEY_REFERENCE_TYPE, GremlinGroovyScriptEngine.REFERENCE_TYPE_PHANTOM);
        assertEquals(4, client.submit("def multiply(x,y){x*y};multiply(2,2)", bindings).all().join().get(0).getInt());

        try {
            client.submit("multiply(2,2)").all().join().get(0).getInt();
            fail("Should throw an exception since reference is phantom.");
        } catch (RuntimeException ignored) {

        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldReceiveFailureOnBadGraphSONSerialization() throws Exception {
        final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRAPHSON_V3D0).create();
        final Client client = cluster.connect();

        try {
            client.submit("def class C { def C getC(){return this}}; new C()").all().join();
            fail("Should throw an exception.");
        } catch (RuntimeException re) {
            final Throwable root = ExceptionUtils.getRootCause(re);
            assertThat(root.getMessage(), CoreMatchers.startsWith("Error during serialization: Direct self-reference leading to cycle (through reference chain:"));

            // validate that we can still send messages to the server
            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldReceiveFailureOnBadGryoSerialization() throws Exception {
        final Cluster cluster = TestClientFactory.build().serializer(Serializers.GRYO_V1D0).create();
        final Client client = cluster.connect();

        try {
            client.submit("java.awt.Color.RED").all().join();
            fail("Should throw an exception.");
        } catch (RuntimeException re) {
            final Throwable root = ExceptionUtils.getRootCause(re);
            assertThat(root.getMessage(), CoreMatchers.startsWith("Error during serialization: Class is not registered: java.awt.Color"));

            // validate that we can still send messages to the server
            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
        } finally {
            cluster.close();
        }
    }

    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
    @Test
    public void shouldBlockRequestWhenTooBig() throws Exception {
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect();

        try {
            final String fatty = IntStream.range(0, 1024).mapToObj(String::valueOf).collect(Collectors.joining());
            final CompletableFuture<ResultSet> result = client.submitAsync("'" + fatty + "';'test'");
            final ResultSet resultSet = result.get(10000, TimeUnit.MILLISECONDS);
            resultSet.all().get(10000, TimeUnit.MILLISECONDS);
            fail("Should throw an exception.");
        } catch (TimeoutException te) {
            // the request should not have timed-out - the connection should have been reset, but it seems that
            // timeout seems to occur as well on some systems (it's not clear why).  however, the nature of this
            // test is to ensure that the script isn't processed if it exceeds a certain size, so in this sense
            // it seems ok to pass in this case.
        } catch (Exception re) {
            final Throwable root = ExceptionUtils.getRootCause(re);

            // went with two possible error messages here as i think that there is some either non-deterministic
            // behavior around the error message or it's environmentally dependent (e.g. different jdk, versions, etc)
            assertThat(root.getMessage(), Matchers.anyOf(is("Connection to server is no longer active"), is("Connection reset by peer")));

            // validate that we can still send messages to the server
            assertEquals(2, client.submit("1+1").all().join().get(0).getInt());
        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldFailOnDeadHost() throws Exception {
        final Cluster cluster = TestClientFactory.build().create();
        final Client client = cluster.connect();

        // ensure that connection to server is good
        assertEquals(2, client.submit("1+1").all().join().get(0).getInt());

        // kill the server which will make the client mark the host as unavailable
        this.stopServer();

        try {
            // try to re-issue a request now that the server is down
            client.submit("g").all().join();
            fail("Should throw an exception.");
        } catch (RuntimeException re) {
            // Client would have no active connections to the host, hence it would encounter a timeout
            // trying to find an alive connection to the host.
            assertThat(re.getCause().getCause() instanceof TimeoutException, is(true));

            //
            // should recover when the server comes back
            //

            // restart server
            this.startServer();

            // try a bunch of times to reconnect. on slower systems this may simply take longer...looking at you travis
            for (int ix = 1; ix < 11; ix++) {
                // the retry interval is 1 second, wait a bit longer
                TimeUnit.SECONDS.sleep(5);

                try {
                    final List<Result> results = client.submit("1+1").all().join();
                    assertEquals(1, results.size());
                    assertEquals(2, results.get(0).getInt());
                } catch (Exception ex) {
                    if (ix == 10)
                        fail("Should have eventually succeeded");
                }
            }
        } finally {
            cluster.close();
        }
    }

    @Test
    public void shouldNotHavePartialContentWithOneResult() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "10").create();
            final List<ResponseMessage> responses = client.submit(request);
            assertEquals(1, responses.size());
            assertEquals(ResponseStatusCode.SUCCESS, responses.get(0).getStatus().getCode());
        }
    }

    @Test
    public void shouldHavePartialContentWithLongResultsCollection() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "new String[100]").create();
            final List<ResponseMessage> responses = client.submit(request);
            assertThat(responses.size(), Matchers.greaterThan(1));
            for (Iterator<ResponseMessage> it = responses.iterator(); it.hasNext(); ) {
                final ResponseMessage msg = it.next();
                final ResponseStatusCode expected = it.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
                assertEquals(expected, msg.getStatus().getCode());
            }
        }
    }

    @Test
    public void shouldFailWithBadScriptEval() throws Exception {
        try (SimpleClient client = TestClientFactory.createWebSocketClient()) {
            final RequestMessage request = RequestMessage.build(Tokens.OPS_EVAL)
                    .addArg(Tokens.ARGS_GREMLIN, "new String().doNothingAtAllBecauseThis is a syntax error").create();
            final List<ResponseMessage> responses = client.submit(request);
            assertEquals(ResponseStatusCode.SERVER_ERROR_SCRIPT_EVALUATION, responses.get(0).getStatus().getCode());
            assertEquals(1, responses.size());
        }
    }

    @Test
    public void shouldSupportLambdasUsingWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).iterate();
        g.addV("person").property("age", 10).iterate();
        assertEquals(50L, g.V().hasLabel("person").map(Lambda.function("it.get().value('age') + 10")).sum().next());
    }

    @Test
    public void shouldGetSideEffectKeysAndStatusUsingWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).iterate();
        g.addV("person").property("age", 10).iterate();
        final GraphTraversal traversal = g.V().aggregate("a").aggregate("b");
        traversal.iterate();
        final DriverRemoteTraversalSideEffects se = (DriverRemoteTraversalSideEffects) traversal.asAdmin().getSideEffects();
        assertThat(se.statusAttributes().containsKey(Tokens.ARGS_HOST), is(true));

        // Get keys
        final Set<String> sideEffectKeys = se.keys();
        assertEquals(2, sideEffectKeys.size());

        // Get side effects
        final BulkSet aSideEffects = se.get("a");
        assertThat(aSideEffects.isEmpty(), is(false));
        final BulkSet bSideEffects = se.get("b");
        assertThat(bSideEffects.isEmpty(), is(false));

        // Should get local keys/side effects after close
        se.close();

        final Set<String> localSideEffectKeys = se.keys();
        assertEquals(2, localSideEffectKeys.size());

        final BulkSet localASideEffects = se.get("a");
        assertThat(localASideEffects.isEmpty(), is(false));

        final BulkSet localBSideEffects = se.get("b");
        assertThat(localBSideEffects.isEmpty(), is(false));

        final GraphTraversal gdotv = g.V();
        gdotv.toList();
        final DriverRemoteTraversalSideEffects gdotvSe = (DriverRemoteTraversalSideEffects) gdotv.asAdmin().getSideEffects();
        assertThat(gdotvSe.statusAttributes().containsKey(Tokens.ARGS_HOST), is(true));
    }

    @Test
    public void shouldCloseSideEffectsUsingWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).iterate();
        g.addV("person").property("age", 10).iterate();
        final GraphTraversal traversal = g.V().aggregate("a").aggregate("b");
        traversal.iterate();
        final DriverRemoteTraversalSideEffects se = (DriverRemoteTraversalSideEffects) traversal.asAdmin().getSideEffects();
        final BulkSet sideEffects = se.get("a");
        assertThat(sideEffects.isEmpty(), is(false));
        se.close();

        // Can't get new side effects after close
        try {
            se.get("b");
            fail("The traversal is closed");
        } catch (Exception ex) {
            assertThat(ex, instanceOf(IllegalStateException.class));
            assertEquals("Traversal has been closed - no new side-effects can be retrieved", ex.getMessage());
        }

        // Earlier keys should be cached locally
        final Set<String> localSideEffectKeys = se.keys();
        assertEquals(2, localSideEffectKeys.size());
        final BulkSet localSideEffects = se.get("a");
        assertThat(localSideEffects.isEmpty(), is(false));

        // Try to get side effect from server
        final Cluster cluster = TestClientFactory.open();
        final Client client = cluster.connect();
        final Field field = DriverRemoteTraversalSideEffects.class.getDeclaredField("serverSideEffect");
        field.setAccessible(true);
        final UUID serverSideEffectId = (UUID) field.get(se);
        final Map<String, String> aliases = new HashMap<>();
        aliases.put("g", "g");
        final RequestMessage msg = RequestMessage.build(Tokens.OPS_GATHER)
                .addArg(Tokens.ARGS_SIDE_EFFECT, serverSideEffectId)
                .addArg(Tokens.ARGS_SIDE_EFFECT_KEY, "b")
                .addArg(Tokens.ARGS_ALIASES, aliases)
                .processor("traversal").create();
        boolean error;
        try {
            client.submitAsync(msg).get().one();
            error = false;
        } catch (Exception ex) {
            error = true;
        }
        assertThat(error, is(true));
    }

    @Test
    public void shouldBlockWhenGettingSideEffectKeysUsingWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).iterate();
        g.addV("person").property("age", 10).iterate();
        final GraphTraversal traversal = g.V().aggregate("a")
                .sideEffect(Lambda.consumer("{Thread.sleep(3000)}"))
                .aggregate("b");

        // force strategy application - if this doesn't happen then getSideEffects() returns DefaultTraversalSideEffects
        traversal.hasNext();

        // start a separate thread to iterate
        final Thread t = new Thread(traversal::iterate);
        t.start();

        // blocks here until traversal iteration is complete
        final DriverRemoteTraversalSideEffects se = (DriverRemoteTraversalSideEffects) traversal.asAdmin().getSideEffects();

        // Get keys
        final Set<String> sideEffectKeys = se.keys();
        assertEquals(2, sideEffectKeys.size());

        // Get side effects
        final BulkSet aSideEffects = se.get("a");
        assertThat(aSideEffects.isEmpty(), is(false));
        final BulkSet bSideEffects = se.get("b");
        assertThat(bSideEffects.isEmpty(), is(false));

        // Should get local keys/side effects after close
        se.close();

        final Set<String> localSideEffectKeys = se.keys();
        assertEquals(2, localSideEffectKeys.size());

        final BulkSet localASideEffects = se.get("a");
        assertThat(localASideEffects.isEmpty(), is(false));

        final BulkSet localBSideEffects = se.get("b");
        assertThat(localBSideEffects.isEmpty(), is(false));
    }

    @Test
    public void shouldBlockWhenGettingSideEffectValuesUsingWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).iterate();
        g.addV("person").property("age", 10).iterate();
        final GraphTraversal traversal = g.V().aggregate("a")
                .sideEffect(Lambda.consumer("{Thread.sleep(3000)}"))
                .aggregate("b");

        // force strategy application - if this doesn't happen then getSideEffects() returns DefaultTraversalSideEffects
        traversal.hasNext();

        // start a separate thread to iterate
        final Thread t = new Thread(traversal::iterate);
        t.start();

        // blocks here until traversal iteration is complete
        final DriverRemoteTraversalSideEffects se = (DriverRemoteTraversalSideEffects) traversal.asAdmin().getSideEffects();

        // Get side effects
        final BulkSet aSideEffects = se.get("a");
        assertThat(aSideEffects.isEmpty(), is(false));
        final BulkSet bSideEffects = se.get("b");
        assertThat(bSideEffects.isEmpty(), is(false));

        // Get keys
        final Set<String> sideEffectKeys = se.keys();
        assertEquals(2, sideEffectKeys.size());

        // Should get local keys/side effects after close
        se.close();

        final Set<String> localSideEffectKeys = se.keys();
        assertEquals(2, localSideEffectKeys.size());

        final BulkSet localASideEffects = se.get("a");
        assertThat(localASideEffects.isEmpty(), is(false));

        final BulkSet localBSideEffects = se.get("b");
        assertThat(localBSideEffects.isEmpty(), is(false));
    }

    @Test
    public void shouldDoNonBlockingPromiseWithRemote() throws Exception {
        final GraphTraversalSource g = traversal().withRemote(conf);
        g.addV("person").property("age", 20).promise(Traversal::iterate).join();
        g.addV("person").property("age", 10).promise(Traversal::iterate).join();
        assertEquals(50L, g.V().hasLabel("person").map(Lambda.function("it.get().value('age') + 10")).sum().promise(t -> t.next()).join());
        g.addV("person").property("age", 20).promise(Traversal::iterate).join();

        final Traversal<Vertex,Integer> traversal = g.V().hasLabel("person").has("age", 20).values("age");
        int age = traversal.promise(t -> t.next(1).get(0)).join();
        assertEquals(20, age);
        assertEquals(20, (int)traversal.next());
        assertThat(traversal.hasNext(), is(false));

        final Traversal traversalCloned = g.V().hasLabel("person").has("age", 20).values("age");
        assertEquals(20, traversalCloned.next());
        assertEquals(20, traversalCloned.promise(t -> ((Traversal) t).next(1).get(0)).join());
        assertThat(traversalCloned.promise(t -> ((Traversal) t).hasNext()).join(), is(false));

        assertEquals(3, g.V().promise(Traversal::toList).join().size());
    }

    @Test
    public void shouldProvideBetterExceptionForMethodCodeTooLarge() {
        final int numberOfParameters = 4000;
        final Map<String,Object> b = new HashMap<>();

        // generate a script with a ton of bindings usage to generate a "code too large" exception
        String script = "x = 0";
        for (int ix = 0; ix < numberOfParameters; ix++) {
            if (ix > 0 && ix % 100 == 0) {
                script = script + ";" + System.lineSeparator() + "x = x";
            }
            script = script + " + x" + ix;
            b.put("x" + ix, ix);
        }

        final Cluster cluster = TestClientFactory.build().maxContentLength(4096000).create();
        final Client client = cluster.connect();

        try {
            client.submit(script, b).all().get();
            fail("Should have tanked out because of number of parameters used and size of the compile script");
        } catch (Exception ex) {
            assertThat(ex.getMessage(), containsString("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM"));
        }
    }
}
