blob: 588d36b1a5755fd360df163a5de693c7a9210a96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.simple.SimpleClient;
import org.apache.tinkerpop.gremlin.server.channel.UnifiedChannelizer;
import org.apache.tinkerpop.gremlin.server.op.session.Session;
import org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor;
import org.apache.tinkerpop.gremlin.util.Log4jRecordingAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsIterableContaining.hasItem;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
/**
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class GremlinServerSessionIntegrateTest extends AbstractGremlinServerIntegrationTest {
private Log4jRecordingAppender recordingAppender = null;
private Level originalLevel;
@Before
public void setupForEachTest() {
recordingAppender = new Log4jRecordingAppender();
final Logger rootLogger = Logger.getRootLogger();
originalLevel = rootLogger.getLevel();
final String nameOfTest = name.getMethodName();
switch (nameOfTest) {
case "shouldCloseSessionOnceOnRequest":
case "shouldHaveTheSessionTimeout":
case "shouldCloseSessionOnClientClose":
final org.apache.log4j.Logger sessionLogger = org.apache.log4j.Logger.getLogger(Session.class);
sessionLogger.setLevel(Level.DEBUG);
break;
}
rootLogger.addAppender(recordingAppender);
}
@After
public void teardownForEachTest() {
final org.apache.log4j.Logger sessionLogger = org.apache.log4j.Logger.getLogger(Session.class);
sessionLogger.setLevel(originalLevel);
sessionLogger.removeAppender(recordingAppender);
}
/**
* Configure specific Gremlin Server settings for specific tests.
*/
@Override
public Settings overrideSettings(final Settings settings) {
final String nameOfTest = name.getMethodName();
switch (nameOfTest) {
case "shouldHaveTheSessionTimeout":
case "shouldCloseSessionOnceOnRequest":
settings.processors.clear();
// OpProcessor setting
final Settings.ProcessorSettings processorSettings = new Settings.ProcessorSettings();
processorSettings.className = SessionOpProcessor.class.getCanonicalName();
processorSettings.config = new HashMap<>();
processorSettings.config.put(SessionOpProcessor.CONFIG_SESSION_TIMEOUT, 3000L);
settings.processors.add(processorSettings);
// Unified setting
settings.sessionLifetimeTimeout = 3000L;
break;
case "shouldCloseSessionOnClientClose":
case "shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions":
clearNeo4j(settings);
break;
case "shouldEnsureSessionBindingsAreThreadSafe":
settings.threadPoolWorker = 2;
break;
case "shouldUseGlobalFunctionCache":
// OpProcessor settings are good by default
// UnifiedHandler settings
settings.useCommonEngineForSessions = false;
settings.useGlobalFunctionCacheForSessions = true;
break;
case "shouldNotUseGlobalFunctionCache":
settings.processors.clear();
// OpProcessor settings
final Settings.ProcessorSettings processorSettingsForDisableFunctionCache = new Settings.ProcessorSettings();
processorSettingsForDisableFunctionCache.className = SessionOpProcessor.class.getCanonicalName();
processorSettingsForDisableFunctionCache.config = new HashMap<>();
processorSettingsForDisableFunctionCache.config.put(SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, false);
settings.processors.add(processorSettingsForDisableFunctionCache);
// UnifiedHandler settings
settings.useCommonEngineForSessions = false;
settings.useGlobalFunctionCacheForSessions = false;
break;
case "shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient":
case "shouldExecuteInSessionWithTransactionManagement":
case "shouldRollbackOnEvalExceptionForManagedTransaction":
case "shouldNotExecuteQueuedRequestsIfOneInFrontOfItFails":
clearNeo4j(settings);
break;
case "shouldBlowTheSessionQueueSize":
clearNeo4j(settings);
settings.maxSessionTaskQueueSize = 1;
break;
}
return settings;
}
private static void clearNeo4j(Settings settings) {
deleteDirectory(new File("/tmp/neo4j"));
settings.graphs.put("graph", "conf/neo4j-empty.properties");
}
@Test
public void shouldBlowTheSessionQueueSize() throws Exception {
assumeNeo4jIsPresent();
assumeThat(isUsingUnifiedChannelizer(), is(true));
final Cluster cluster = TestClientFactory.open();
final Client session = cluster.connect(name.getMethodName());
// maxSessionQueueSize=1
// we should be able to do one request at a time serially
assertEquals("test1", session.submit("'test1'").all().get().get(0).getString());
assertEquals("test2", session.submit("'test2'").all().get().get(0).getString());
assertEquals("test3", session.submit("'test3'").all().get().get(0).getString());
final AtomicBoolean errorTriggered = new AtomicBoolean();
final ResultSet r1 = session.submitAsync("Thread.sleep(1000);'test4'").get();
final List<CompletableFuture<List<Result>>> blockers = new ArrayList<>();
for (int ix = 0; ix < 512 && !errorTriggered.get(); ix++) {
blockers.add(session.submit("'test'").all().exceptionally(t -> {
final ResponseException re = (ResponseException) t.getCause();
errorTriggered.compareAndSet(false, ResponseStatusCode.TOO_MANY_REQUESTS == re.getResponseStatusCode());
return null;
}));
// low resource environments like travis might need a break
if (ix % 32 == 0) Thread.sleep(500);
}
// wait for the blockage to clear for sure
assertEquals("test4", r1.all().get().get(0).getString());
blockers.forEach(CompletableFuture::join);
assertThat(errorTriggered.get(), is(true));
// should be accepting test6 now
assertEquals("test6", session.submit("'test6'").all().get().get(0).getString());
session.close();
cluster.close();
}
@Test
public void shouldNotExecuteQueuedRequestsIfOneInFrontOfItFails() throws Exception {
assumeNeo4jIsPresent();
assumeThat(isUsingUnifiedChannelizer(), is(true));
final Cluster cluster = TestClientFactory.open();
final Client session = cluster.connect(name.getMethodName());
final List<CompletableFuture<ResultSet>> futuresAfterFailure = new ArrayList<>();
// short, sweet and successful - but will rollback after close
final CompletableFuture<List<Result>> first = session.submit("g.addV('person').iterate();1").all();
assertEquals(1, first.get().get(0).getInt());
// leave enough time for the results to queue behind this
final int followOnRequestCount = 8;
final CompletableFuture<ResultSet> second = session.submitAsync("g.addV('person').sideEffect{Thread.sleep(60000)}");
// these requests will queue
for (int ix = 0; ix < followOnRequestCount; ix ++) {
futuresAfterFailure.add(session.submitAsync("g.addV('software')"));
}
// close the session which will interrupt "second" request and flush the queue of requests behind it
session.close();
try {
second.join().all().join();
fail("should have been interrupted by channel close");
} catch (Exception ex) {
assertEquals("Connection to server is no longer active", ex.getCause().getMessage());
}
final AtomicInteger fails = new AtomicInteger(0);
futuresAfterFailure.forEach(f -> {
try {
f.join().all().join();
fail("should have been interrupted by channel close");
} catch (Exception ex) {
assertEquals("Connection to server is no longer active", ex.getCause().getMessage());
fails.incrementAndGet();
}
});
// all are failures
assertEquals(followOnRequestCount, fails.get());
// validate the rollback
final Client client = cluster.connect();
assertEquals(0, client.submit("g.V().count()").all().get().get(0).getInt());
cluster.close();
}
@Test
public void shouldCloseSessionOnClientClose() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster1 = TestClientFactory.open();
final Client client1 = cluster1.connect(name.getMethodName());
client1.submit("x = 1").all().join();
client1.submit("graph.addVertex()").all().join();
client1.close();
cluster1.close();
// the following session close log message is no longer relevant as
if (isUsingUnifiedChannelizer()) {
assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
} else {
assertThat(recordingAppender.getMessages(), hasItem("DEBUG - Skipped attempt to close open graph transactions on shouldCloseSessionOnClientClose - close was forced\n"));
assertThat(recordingAppender.getMessages(), hasItem("DEBUG - Session shouldCloseSessionOnClientClose closed\n"));
}
// try to reconnect to that session and make sure no state is there
final Cluster clusterReconnect = TestClientFactory.open();
final Client clientReconnect = clusterReconnect.connect(name.getMethodName());
// should get an error because "x" is not defined as this is a new session
try {
clientReconnect.submit("y=100").all().join();
clientReconnect.submit("x").all().join();
fail("Should not have been successful as 'x' was only defined in the old session");
} catch(Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root.getMessage(), startsWith("No such property"));
}
// the commit from client1 should not have gone through so there should be no data present.
assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
// must turn on maintainStateAfterException for unified channelizer
if (!isUsingUnifiedChannelizer()) {
assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
}
clusterReconnect.close();
if (isUsingUnifiedChannelizer()) {
assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
}
}
@Test
public void shouldCloseSessionOnClientCloseWithStateMaintainedBetweenExceptions() throws Exception {
assumeNeo4jIsPresent();
assumeThat("Must use UnifiedChannelizer", isUsingUnifiedChannelizer(), is(true));
final Cluster cluster1 = TestClientFactory.open();
final Client client1 = cluster1.connect(name.getMethodName());
client1.submit("x = 1").all().join();
client1.submit("graph.addVertex()").all().join();
client1.close();
cluster1.close();
assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
// try to reconnect to that session and make sure no state is there
final Cluster clusterReconnect = TestClientFactory.open();
// this configures the client to behave like OpProcessor for UnifiedChannelizer
final Client.SessionSettings settings = Client.SessionSettings.build().
sessionId(name.getMethodName()).maintainStateAfterException(true).create();
final Client clientReconnect = clusterReconnect.connect(Client.Settings.build().useSession(settings).create());
// should get an error because "x" is not defined as this is a new session
try {
clientReconnect.submit("y=100").all().join();
clientReconnect.submit("x").all().join();
fail("Should not have been successful as 'x' was only defined in the old session");
} catch(Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root.getMessage(), startsWith("No such property"));
}
// the commit from client1 should not have gone through so there should be no data present.
assertEquals(0, clientReconnect.submit("graph.traversal().V().count()").all().join().get(0).getInt());
// since maintainStateAfterException is enabled the UnifiedChannelizer works like OpProcessor
assertEquals(100, clientReconnect.submit("y").all().join().get(0).getInt());
clusterReconnect.close();
assertEquals(0, ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().getActiveSessionCount());
}
@Test
public void shouldUseGlobalFunctionCache() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client session = cluster.connect(name.getMethodName());
final Client client = cluster.connect();
assertEquals(3, session.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
assertEquals(3, session.submit("sumItUp(1,2)").all().get().get(0).getInt());
try {
client.submit("sumItUp(1,2)").all().get().get(0).getInt();
fail("Global functions should not be cached so the call to sumItUp() should fail");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root.getMessage(), startsWith("No signature of method"));
} finally {
cluster.close();
}
}
@Test
public void shouldNotUseGlobalFunctionCache() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
try {
assertEquals(3, client.submit("def sumItUp(x,y){x+y};sumItUp(1,2)").all().get().get(0).getInt());
} catch (Exception ex) {
cluster.close();
throw ex;
}
try {
client.submit("sumItUp(1,2)").all().get().get(0).getInt();
fail("Global functions should not be cached so the call to addItUp() should fail");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root.getMessage(), startsWith("No signature of method"));
} finally {
cluster.close();
}
}
@Test
public void shouldNotAllowMoreThanOneClientPerSession() throws Exception {
final Cluster cluster1 = TestClientFactory.open();
final Client client1 = cluster1.connect(name.getMethodName());
client1.submit("1+1").all().join();
final Cluster cluster2 = TestClientFactory.open();
final Client.SessionSettings sessionSettings = Client.SessionSettings.build()
.sessionId(name.getMethodName())
.forceClosed(true).create();
final Client client2 = cluster2.connect(Client.Settings.build().useSession(sessionSettings).create());
try {
client2.submit("2+2").all().join();
fail("Can't have more than one client connecting to the same session");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertEquals("Session shouldNotAllowMoreThanOneClientPerSession is not bound to the connecting client", root.getMessage());
}
cluster1.close();
cluster2.close();
}
@Test
public void shouldRollbackOnEvalExceptionForManagedTransaction() throws Exception {
assumeNeo4jIsPresent();
final Cluster cluster = TestClientFactory.open();
final Client.SessionSettings sessionSettings = Client.SessionSettings.build().
sessionId(name.getMethodName()).
manageTransactions(true).
maintainStateAfterException(false).
create();
final Client.Settings clientSettings = Client.Settings.build().useSession(sessionSettings).create();
final Client client = cluster.connect(clientSettings);
try {
client.submit("graph.addVertex(); throw new Exception('no worky')").all().get();
fail("Should have tossed the manually generated exception");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertEquals("no worky", root.getMessage());
}
try {
// just force a commit here of "something" to ensure the rollback of the previous request
client.submit("graph.addVertex(); graph.tx().commit()").all().get(10, TimeUnit.SECONDS);
// the transaction is managed so a rollback should have executed
assertEquals(1, client.submit("g.V().count()").all().get(10, TimeUnit.SECONDS).get(0).getInt());
} finally {
cluster.close();
}
}
@Test
public void shouldCloseSessionOnceOnRequest() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client client = cluster.connect(name.getMethodName());
final ResultSet results1 = client.submit("x = [1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
final ResultSet results2 = client.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
// explicitly close the session
client.close();
// wait past automatic session expiration
Thread.sleep(3500);
try {
// the original session should be dead so this call will open a new session with the same name but fail
// because the state is now gone - x is an invalid property
client.submit("x[1]+2").all().get();
fail("Session should be dead");
} catch (Exception ex) {
final Throwable root = ExceptionUtils.getRootCause(ex);
assertThat(root, instanceOf(IllegalStateException.class));
} finally {
cluster.close();
}
if (isUsingUnifiedChannelizer()) {
assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
} else {
assertEquals(1, recordingAppender.getMessages().stream()
.filter(msg -> msg.equals("DEBUG - Session shouldCloseSessionOnceOnRequest closed\n")).count());
}
}
@Test
public void shouldHaveTheSessionTimeout() throws Exception {
final Cluster cluster = TestClientFactory.open();
final Client.SessionSettings settings = Client.SessionSettings.build().
sessionId(name.getMethodName()).maintainStateAfterException(true).create();
final Client session = cluster.connect(Client.Settings.build().useSession(settings).create());
final ResultSet results1 = session.submit("x = [1,2,3,4,5,6,7,8,9]");
final AtomicInteger counter = new AtomicInteger(0);
results1.stream().map(i -> i.get(Integer.class) * 2).forEach(i -> assertEquals(counter.incrementAndGet() * 2, Integer.parseInt(i.toString())));
final ResultSet results2 = session.submit("x[0]+1");
assertEquals(2, results2.all().get().get(0).getInt());
// session times out in 3 seconds but slow environments like travis might not be so exacting in the
// shutdown process so be patient
if (isUsingUnifiedChannelizer()) {
boolean sessionAlive = true;
for (int ix = 1; ix < 11; ix++) {
sessionAlive = ((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName());
if (!sessionAlive) break;
Thread.sleep(ix * 500);
}
assertThat(sessionAlive, is(false));
} else {
Thread.sleep(5000);
}
try {
// the original session should be dead so this call will open a new session with the same name but fail
// because the state is now gone - x is an invalid property.
session.submit("x[1]+2").all().get();
fail("Session should be dead");
} catch (Exception ex) {
final Throwable cause = ExceptionUtils.getCause(ex);
assertThat(cause, instanceOf(ResponseException.class));
assertEquals(ResponseStatusCode.SERVER_ERROR_EVALUATION, ((ResponseException) cause).getResponseStatusCode());
// validate that we can still send messages to the server
assertEquals(2, session.submit("1+1").all().join().get(0).getInt());
} finally {
cluster.close();
}
if (isUsingUnifiedChannelizer()) {
assertThat(((UnifiedChannelizer) server.getChannelizer()).getUnifiedHandler().isActiveSession(name.getMethodName()), is(false));
} else {
// there will be one for the timeout and a second for closing the cluster
assertEquals(2, recordingAppender.getMessages().stream()
.filter(msg -> msg.equals("DEBUG - Session shouldHaveTheSessionTimeout closed\n")).count());
}
}
@Test
public void shouldEnsureSessionBindingsAreThreadSafe() throws Exception {
final Cluster cluster = TestClientFactory.build().
minInProcessPerConnection(16).maxInProcessPerConnection(64).create();
final Client client = cluster.connect(name.getMethodName());
try {
client.submit("a=100;b=1000;c=10000;null").all().get();
final int requests = 10000;
final List<CompletableFuture<ResultSet>> futures = new ArrayList<>(requests);
IntStream.range(0, requests).forEach(i -> {
try {
futures.add(client.submitAsync("a+b+c"));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
});
assertEquals(requests, futures.size());
int counter = 0;
for (CompletableFuture<ResultSet> f : futures) {
final Result r = f.get().all().get(30000, TimeUnit.MILLISECONDS).get(0);
assertEquals(11100, r.getInt());
counter++;
}
assertEquals(requests, counter);
} catch (Exception ex) {
fail(ex.getMessage());
} finally {
cluster.close();
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldExecuteInSessionAndSessionlessWithoutOpeningTransactionWithSingleClient() throws Exception {
assumeNeo4jIsPresent();
try (final SimpleClient client = TestClientFactory.createWebSocketClient()) {
//open a transaction, create a vertex, commit
final RequestMessage openRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().open()")
.create();
final List<ResponseMessage> openResponses = client.submit(openRequest);
assertEquals(1, openResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, openResponses.get(0).getStatus().getCode());
final RequestMessage addRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "v=graph.addVertex(\"name\",\"stephen\")")
.create();
final List<ResponseMessage> addResponses = client.submit(addRequest);
assertEquals(1, addResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, addResponses.get(0).getStatus().getCode());
final RequestMessage commitRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().commit()")
.create();
final List<ResponseMessage> commitResponses = client.submit(commitRequest);
assertEquals(1, commitResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, commitResponses.get(0).getStatus().getCode());
// Check to see if the transaction is closed.
final RequestMessage checkRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
final List<ResponseMessage> checkResponses = client.submit(checkRequest);
assertEquals(1, checkResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, checkResponses.get(0).getStatus().getCode());
assertThat(((List<Boolean>) checkResponses.get(0).getResult().getData()).get(0), is(false));
//lets run a sessionless read
final RequestMessage sessionlessRequest = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "graph.traversal().V()")
.create();
final List<ResponseMessage> sessionlessResponses = client.submit(sessionlessRequest);
assertEquals(1, sessionlessResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, sessionlessResponses.get(0).getStatus().getCode());
// Check to see if the transaction is still closed.
final RequestMessage checkAgainRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
final List<ResponseMessage> checkAgainstResponses = client.submit(checkAgainRequest);
assertEquals(1, checkAgainstResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, checkAgainstResponses.get(0).getStatus().getCode());
assertThat(((List<Boolean>) checkAgainstResponses.get(0).getResult().getData()).get(0), is(false));
}
}
@Test
@SuppressWarnings("unchecked")
public void shouldExecuteInSessionWithTransactionManagement() throws Exception {
assumeNeo4jIsPresent();
try (final SimpleClient client = TestClientFactory.createWebSocketClient()) {
final RequestMessage addRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "v=graph.addVertex(\"name\",\"stephen\")")
.addArg(Tokens.ARGS_MANAGE_TRANSACTION, true)
.create();
final List<ResponseMessage> addResponses = client.submit(addRequest);
assertEquals(1, addResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, addResponses.get(0).getStatus().getCode());
// Check to see if the transaction is closed.
final RequestMessage checkRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
final List<ResponseMessage> checkResponses = client.submit(checkRequest);
assertEquals(1, checkResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, checkResponses.get(0).getStatus().getCode());
assertThat(((List<Boolean>) checkResponses.get(0).getResult().getData()).get(0), is(false));
// lets run a sessionless read and validate that the transaction was managed
final RequestMessage sessionlessRequest = RequestMessage.build(Tokens.OPS_EVAL)
.addArg(Tokens.ARGS_GREMLIN, "graph.traversal().V().values('name')")
.create();
final List<ResponseMessage> sessionlessResponses = client.submit(sessionlessRequest);
assertEquals(1, sessionlessResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, sessionlessResponses.get(0).getStatus().getCode());
assertEquals("stephen", ((List<String>) sessionlessResponses.get(0).getResult().getData()).get(0));
// make sure the session is intact
final RequestMessage getRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "v.values(\"name\")")
.addArg(Tokens.ARGS_MANAGE_TRANSACTION, true)
.create();
final List<ResponseMessage> getResponses = client.submit(getRequest);
assertEquals(1, getResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, getResponses.get(0).getStatus().getCode());
assertEquals("stephen", ((List<String>) getResponses.get(0).getResult().getData()).get(0));
// Check to see if the transaction is still closed.
final RequestMessage checkAgainRequest = RequestMessage.build(Tokens.OPS_EVAL)
.processor("session")
.addArg(Tokens.ARGS_SESSION, name.getMethodName())
.addArg(Tokens.ARGS_GREMLIN, "graph.tx().isOpen()")
.create();
final List<ResponseMessage> checkAgainstResponses = client.submit(checkAgainRequest);
assertEquals(1, checkAgainstResponses.size());
assertEquals(ResponseStatusCode.SUCCESS, checkAgainstResponses.get(0).getStatus().getCode());
assertThat(((List<Boolean>) checkAgainstResponses.get(0).getResult().getData()).get(0), is(false));
}
}
}