blob: 4b137f3dc269bd02b689ccb47526975669dfb442 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server.handler;
import com.codahale.metrics.Timer;
import groovy.lang.GroovyRuntimeException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.MessageSerializer;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.groovy.jsr223.TimedInterruptTimeoutException;
import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
import org.apache.tinkerpop.gremlin.process.traversal.Failure;
import org.apache.tinkerpop.gremlin.process.traversal.GraphOp;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
import org.apache.tinkerpop.gremlin.process.traversal.util.BytecodeHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalInterruptedException;
import org.apache.tinkerpop.gremlin.server.GraphManager;
import org.apache.tinkerpop.gremlin.server.GremlinServer;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.server.auth.AuthenticatedUser;
import org.apache.tinkerpop.gremlin.util.ExceptionHelper;
import org.apache.tinkerpop.gremlin.server.util.TraverserIterator;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.apache.tinkerpop.gremlin.structure.util.TemporaryException;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.codehaus.groovy.control.MultipleCompilationErrorsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Bindings;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import static org.apache.tinkerpop.gremlin.process.traversal.GraphOp.TX_COMMIT;
import static org.apache.tinkerpop.gremlin.process.traversal.GraphOp.TX_ROLLBACK;
/**
* A base implementation of {@link Session} which offers some common functionality that matches typical Gremlin Server
* request response expectations for script, bytecode and graph operations. The class is designed to be extended but
* take care in understanding the way that different methods are called as they do depend on one another a bit. It
* maybe best to examine the source code to determine how best to use this class or to extend from the higher order
* classes of {@link SingleTaskSession} or {@link MultiTaskSession}.
*/
public abstract class AbstractSession implements Session, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(AbstractSession.class);
private static final Logger auditLogger = LoggerFactory.getLogger(GremlinServer.AUDIT_LOGGER_NAME);
private final boolean sessionIdOnRequest;
private final Channel initialChannel;
private final boolean transactionManaged;
private final String sessionId;
private final AtomicReference<ScheduledFuture<?>> sessionCancelFuture = new AtomicReference<>();
private final AtomicReference<Future<?>> sessionFuture = new AtomicReference<>();
private long actualTimeoutLengthWhenClosed = 0;
/**
* The session thread is a reference to the thread that is running the session and should be set by an
* implementation as the first line of the {@link #run()} method.
*/
protected Thread sessionThread;
protected final boolean maintainStateAfterException;
protected final AtomicReference<CloseReason> closeReason = new AtomicReference<>();
protected final GraphManager graphManager;
protected final ConcurrentMap<String, Session> sessions;
protected final Set<String> aliasesUsedBySession = new HashSet<>();
protected final AtomicBoolean sessionTaskStarted = new AtomicBoolean(false);
/**
* The reason that a particular session closed. The reason for the close is generally not important as a
* final disposition for the {@link Session} instance and is more useful in aiding flow control during the
* close process.
*/
protected enum CloseReason {
/**
* The session exits in a fashion that did not precipitate from some form of interruption, timeout or
* exception, i.e. it is simply allowed to process to an exit through its normal execution flow. This status
* may or may not be possible given the context of the implementation. For example, a {@link MultiTaskSession}
* needs to be interrupted to stop processing.
*/
EXIT_PROCESSING,
/**
* The session was interrupted by the channel closing, which can be something initiated by closing the
* {@link Client} or might be triggered by the server. This may not be considered an error situation and
* depending on context, might be similar to a {@link #EXIT_PROCESSING} termination.
*/
CHANNEL_CLOSED,
/**
* The session encountered an exception related to execution like a script error, traversal iteration problem,
* serialization issue, etc.
*/
PROCESSING_EXCEPTION,
/**
* The session was interrupted by the session lifetime timeout.
*/
SESSION_TIMEOUT,
/**
* The session was interrupted by the request timeout.
*/
REQUEST_TIMEOUT
}
AbstractSession(final SessionTask sessionTask, final String sessionId,
final boolean transactionManaged,
final ConcurrentMap<String, Session> sessions) {
// this only applies to sessions
this.maintainStateAfterException = (boolean) sessionTask.getRequestMessage().
optionalArgs(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION).orElse(false);
this.sessionIdOnRequest = sessionTask.getRequestMessage().optionalArgs(Tokens.ARGS_SESSION).isPresent();
this.transactionManaged = transactionManaged;
this.sessionId = sessionId;
this.initialChannel = sessionTask.getChannelHandlerContext().channel();
// close session if the channel closes to cleanup and close transactions
this.initialChannel.closeFuture().addListener(f -> {
if (closeReason.compareAndSet(null, CloseReason.CHANNEL_CLOSED)) {
close();
}
});
this.sessions = sessions;
this.graphManager = sessionTask.getGraphManager();
}
protected synchronized void cancel(final boolean mayInterruptIfRunning) {
final FutureTask<?> sf = (FutureTask) sessionFuture.get();
if (sf != null && !sf.isDone()) {
sf.cancel(mayInterruptIfRunning);
if (!sessionTaskStarted.get()) {
sendTimeoutResponseForUncommencedTask();
}
}
}
public boolean isTransactionManaged() {
return transactionManaged;
}
@Override
public String getSessionId() {
return sessionId;
}
public boolean isBoundTo(final Channel channel) {
return channel == initialChannel;
}
public long getActualTimeoutLengthWhenClosed() {
return actualTimeoutLengthWhenClosed;
}
public Optional<CloseReason> getCloseReason() {
return Optional.ofNullable(closeReason.get());
}
/**
* Gets the script engine from the cached one in the {@link GremlinExecutor}.
*/
public GremlinScriptEngine getScriptEngine(final SessionTask sessionTask, final String language) {
return sessionTask.getGremlinExecutor().getScriptEngineManager().getEngineByName(language);
}
/**
* Respond to the client with the specific timeout response for this Session implementation.
* This is for situations where the Session hasn't started running.
*/
protected abstract void sendTimeoutResponseForUncommencedTask();
@Override
public void setSessionCancelFuture(final ScheduledFuture<?> f) {
if (!sessionCancelFuture.compareAndSet(null, f))
throw new IllegalStateException("Session cancellation future is already set");
}
@Override
public void setSessionFuture(final Future<?> f) {
if (!sessionFuture.compareAndSet(null, f))
throw new IllegalStateException("Session future is already set");
}
@Override
public synchronized void triggerTimeout(final long timeout, final boolean causedBySession) {
// triggering timeout triggers the stop of the session which will end in close()
// for final cleanup
final Future<?> f = sessionFuture.get();
if (f != null && !f.isDone()) {
if (closeReason.compareAndSet(null, causedBySession ? CloseReason.SESSION_TIMEOUT : CloseReason.REQUEST_TIMEOUT)) {
actualTimeoutLengthWhenClosed = timeout;
// if caused by a session timeout for a MultiTaskSession OR if it is a request timeout for a
// SingleTaskSession request then we can just straight cancel() the session instance
if (causedBySession || !sessionIdOnRequest)
cancel(true);
else {
// in both MultiTaskSession and SingleTaskSession the thread gets set immediately at the start
// of run() as it should (though "single" has little need for it). As triggerTimeout() for a
// request MultiTaskSession can only be called AFTER we are deep in the run() there should be
// no chance of race conditions or situations where the sessionThread is null at this point.
if (sessionThread != null) {
sessionThread.interrupt();
} else {
logger.debug("{} is a {} which cannot be interrupted as the thread running the session has not " +
"been set - please check the implementation if this is not desirable",
sessionId, this.getClass().getSimpleName());
}
}
}
}
}
protected void process(final SessionTask sessionTask) throws SessionException {
final RequestMessage msg = sessionTask.getRequestMessage();
final Map<String, Object> args = msg.getArgs();
final Object gremlinToExecute = args.get(Tokens.ARGS_GREMLIN);
// for strict transactions track the aliases used so that we can commit them and only them on close()
if (sessionTask.getSettings().strictTransactionManagement)
msg.optionalArgs(Tokens.ARGS_ALIASES).ifPresent(m -> aliasesUsedBySession.addAll(((Map<String,String>) m).values()));
final Timer.Context timer = getMetricsTimer(sessionTask);
try {
// itty is optional as Bytecode could be a "graph operation" rather than a Traversal. graph operations
// don't need to be iterated and handle their own lifecycle
final Optional<Iterator<?>> itty = gremlinToExecute instanceof Bytecode ?
fromBytecode(sessionTask, (Bytecode) gremlinToExecute) :
Optional.of(fromScript(sessionTask, (String) gremlinToExecute));
processAuditLog(sessionTask.getSettings(), sessionTask.getChannelHandlerContext(), gremlinToExecute);
if (itty.isPresent())
handleIterator(sessionTask, itty.get());
} catch (Throwable t) {
handleException(sessionTask, t);
} finally {
timer.stop();
}
}
protected void handleException(final SessionTask sessionTask, final Throwable t) throws SessionException {
if (t instanceof SessionException) throw (SessionException) t;
final Optional<Throwable> possibleSpecialException = determineIfSpecialException(t);
if (possibleSpecialException.isPresent()) {
final Throwable special = possibleSpecialException.get();
final ResponseMessage.Builder specialResponseMsg = ResponseMessage.build(sessionTask.getRequestMessage()).
statusMessage(special.getMessage()).
statusAttributeException(special);
if (special instanceof TemporaryException) {
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_TEMPORARY);
} else if (special instanceof Failure) {
final Failure failure = (Failure) special;
specialResponseMsg.code(ResponseStatusCode.SERVER_ERROR_FAIL_STEP).
statusAttribute(Tokens.STATUS_ATTRIBUTE_FAIL_STEP_MESSAGE, failure.format());
}
throw new SessionException(special.getMessage(), specialResponseMsg.create());
}
final Throwable root = ExceptionHelper.getRootCause(t);
if (root instanceof TimedInterruptTimeoutException) {
// occurs when the TimedInterruptCustomizerProvider is in play
final String msg = String.format("A timeout occurred within the script during evaluation of [%s] - consider increasing the limit given to TimedInterruptCustomizerProvider",
sessionTask.getRequestMessage().getRequestId());
throw new SessionException(msg, root, ResponseMessage.build(sessionTask.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage("Timeout during script evaluation triggered by TimedInterruptCustomizerProvider")
.create());
}
if (root instanceof TimeoutException) {
final String errorMessage = String.format("Script evaluation exceeded the configured threshold for request [%s]",
sessionTask.getRequestMessage().getRequestId());
throw new SessionException(errorMessage, root, ResponseMessage.build(sessionTask.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
.statusMessage(t.getMessage())
.create());
}
if (root instanceof InterruptedException ||
root instanceof TraversalInterruptedException ||
root instanceof InterruptedIOException) {
String msg = "Processing interrupted but the reason why was not known";
switch (closeReason.get()) {
case CHANNEL_CLOSED:
msg = "Processing interrupted because the channel was closed";
break;
case SESSION_TIMEOUT:
msg = String.format("Session closed - %s - sessionLifetimeTimeout of %s ms exceeded", sessionId, actualTimeoutLengthWhenClosed);
break;
case REQUEST_TIMEOUT:
msg = String.format("Evaluation exceeded timeout threshold of %s ms", actualTimeoutLengthWhenClosed);
break;
}
final ResponseStatusCode code = closeReason.get() == CloseReason.SESSION_TIMEOUT || closeReason.get() == CloseReason.REQUEST_TIMEOUT ?
ResponseStatusCode.SERVER_ERROR_TIMEOUT : ResponseStatusCode.SERVER_ERROR;
throw new SessionException(msg, root, ResponseMessage.build(sessionTask.getRequestMessage())
.code(code)
.statusMessage(msg).create());
}
if (root instanceof MultipleCompilationErrorsException && root.getMessage().contains("Method too large") &&
((MultipleCompilationErrorsException) root).getErrorCollector().getErrorCount() == 1) {
final String errorMessage = String.format("The Gremlin statement that was submitted exceeds the maximum compilation size allowed by the JVM, please split it into multiple smaller statements - %s", trimMessage(sessionTask.getRequestMessage()));
logger.warn(errorMessage);
throw new SessionException(errorMessage, root, ResponseMessage.build(sessionTask.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
.statusMessage(errorMessage)
.statusAttributeException(root).create());
}
// GroovyRuntimeException will hit a pretty wide range of eval type errors, like MissingPropertyException,
// CompilationFailedException, MissingMethodException, etc. If more specific handling is required then
// try to catch it earlier above.
if (root instanceof GroovyRuntimeException ||
root instanceof VerificationException ||
root instanceof ScriptException) {
throw new SessionException(root.getMessage(), root, ResponseMessage.build(sessionTask.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR_EVALUATION)
.statusMessage(root.getMessage())
.statusAttributeException(root).create());
}
throw new SessionException(root.getClass().getSimpleName() + ": " + root.getMessage(), root,
ResponseMessage.build(sessionTask.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR)
.statusAttributeException(root)
.statusMessage(root.getMessage()).create());
}
/**
* Used to decrease the size of a Gremlin script that triggered a "method too large" exception so that it
* doesn't log a massive text string nor return a large error message.
*/
private RequestMessage trimMessage(final RequestMessage msg) {
final RequestMessage trimmedMsg = RequestMessage.from(msg).create();
if (trimmedMsg.getArgs().containsKey(Tokens.ARGS_GREMLIN))
trimmedMsg.getArgs().put(Tokens.ARGS_GREMLIN, trimmedMsg.getArgs().get(Tokens.ARGS_GREMLIN).toString().substring(0, 1021) + "...");
return trimmedMsg;
}
/**
* Check if any exception in the chain is {@link TemporaryException} or {@link Failure} then respond with the
* right error code so that the client knows to retry.
*/
protected static Optional<Throwable> determineIfSpecialException(final Throwable ex) {
return Stream.of(ExceptionUtils.getThrowables(ex)).
filter(i -> i instanceof TemporaryException || i instanceof Failure).findFirst();
}
/**
* Removes the session from the session list and cancels the future that manages the lifetime of the session.
*/
@Override
public synchronized void close() {
// already closing/closed
if (!sessions.containsKey(sessionId)) return;
sessions.remove(sessionId);
if (sessionCancelFuture.get() != null) {
final ScheduledFuture<?> f = sessionCancelFuture.get();
if (!f.isDone()) f.cancel(true);
}
}
/**
* Constructs an {@code Iterator} from the results of a script evaluation provided in the {@link SessionTask}.
*
* @param sessionTask The session task which can be used as a context in constructing the {@code Iterator}
* @param script The script extracted by the calling method from the {@code sessionTask}
*/
protected Iterator<?> fromScript(final SessionTask sessionTask, final String script) throws Exception {
final RequestMessage msg = sessionTask.getRequestMessage();
final Map<String, Object> args = msg.getArgs();
final String language = args.containsKey(Tokens.ARGS_LANGUAGE) ? (String) args.get(Tokens.ARGS_LANGUAGE) : "gremlin-groovy";
return IteratorUtils.asIterator(getScriptEngine(sessionTask, language).eval(
script, mergeBindingsFromRequest(sessionTask, getWorkerBindings())));
}
/**
* Constructs an {@code Iterator} from {@link Bytecode} provided in the {@link SessionTask}. If the {@link Bytecode}
* is found to evalute to a {@link GraphOp} then it is processed and an empty {@code Optional} is returned.
*
* @param sessionTask The session task which can be used as a context in constructing the {@code Iterator}
* @param bytecode The {@link Bytecode} extracted by the calling method from the {@code sessionTask}
*/
protected Optional<Iterator<?>> fromBytecode(final SessionTask sessionTask, final Bytecode bytecode) throws Exception {
final RequestMessage msg = sessionTask.getRequestMessage();
final Traversal.Admin<?, ?> traversal;
final Map<String, String> aliases = (Map<String, String>) msg.optionalArgs(Tokens.ARGS_ALIASES).get();
final GraphManager graphManager = sessionTask.getGraphManager();
final String traversalSourceName = aliases.entrySet().iterator().next().getValue();
final TraversalSource g = graphManager.getTraversalSource(traversalSourceName);
// handle bytecode based graph operations like commit/rollback commands
if (BytecodeHelper.isGraphOperation(bytecode)) {
handleGraphOperation(sessionTask, bytecode, g.getGraph());
return Optional.empty();
} else {
final Optional<String> lambdaLanguage = BytecodeHelper.getLambdaLanguage(bytecode);
if (!lambdaLanguage.isPresent())
traversal = JavaTranslator.of(g).translate(bytecode);
else {
final SimpleBindings bindings = new SimpleBindings();
bindings.put(traversalSourceName, g);
traversal = sessionTask.getGremlinExecutor().getScriptEngineManager().
getEngineByName(lambdaLanguage.get()).eval(bytecode, bindings, traversalSourceName);
}
// compile the traversal - without it getEndStep() has nothing in it
traversal.applyStrategies();
return Optional.of(new TraverserIterator(traversal));
}
}
protected Bindings getWorkerBindings() throws SessionException {
return new SimpleBindings(graphManager.getAsBindings());
}
protected Bindings mergeBindingsFromRequest(final SessionTask sessionTask, final Bindings bindings) throws SessionException {
// alias any global bindings to a different variable.
final RequestMessage msg = sessionTask.getRequestMessage();
if (msg.getArgs().containsKey(Tokens.ARGS_ALIASES)) {
final Map<String, String> aliases = (Map<String, String>) msg.getArgs().get(Tokens.ARGS_ALIASES);
for (Map.Entry<String,String> aliasKv : aliases.entrySet()) {
boolean found = false;
// first check if the alias refers to a Graph instance
final Graph graph = sessionTask.getGraphManager().getGraph(aliasKv.getValue());
if (null != graph) {
bindings.put(aliasKv.getKey(), graph);
found = true;
}
// if the alias wasn't found as a Graph then perhaps it is a TraversalSource - it needs to be
// something
if (!found) {
final TraversalSource ts = sessionTask.getGraphManager().getTraversalSource(aliasKv.getValue());
if (null != ts) {
bindings.put(aliasKv.getKey(), ts);
found = true;
}
}
// this validation is important to calls to GraphManager.commit() and rollback() as they both
// expect that the aliases supplied are valid
if (!found) {
final String error = String.format("Could not alias [%s] to [%s] as [%s] not in the Graph or TraversalSource global bindings",
aliasKv.getKey(), aliasKv.getValue(), aliasKv.getValue());
throw new SessionException(error, ResponseMessage.build(msg)
.code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
}
}
} else {
// there's no bindings so determine if that's ok with Gremlin Server
if (sessionTask.getSettings().strictTransactionManagement) {
final String error = "Gremlin Server is configured with strictTransactionManagement as 'true' - the 'aliases' arguments must be provided";
throw new SessionException(error, ResponseMessage.build(msg)
.code(ResponseStatusCode.REQUEST_ERROR_INVALID_REQUEST_ARGUMENTS).statusMessage(error).create());
}
}
// add any bindings to override any other supplied
Optional.ofNullable((Map<String, Object>) msg.getArgs().get(Tokens.ARGS_BINDINGS)).ifPresent(bindings::putAll);
return bindings;
}
/**
* Provides a generic way of iterating a result set back to the client.
*
* @param sessionTask The Gremlin Server {@link SessionTask} object containing settings, request message, etc.
* @param itty The result to iterator
*/
protected void handleIterator(final SessionTask sessionTask, final Iterator<?> itty) throws InterruptedException {
final ChannelHandlerContext nettyContext = sessionTask.getChannelHandlerContext();
final RequestMessage msg = sessionTask.getRequestMessage();
final Settings settings = sessionTask.getSettings();
boolean warnOnce = false;
// sessionless requests are always transaction managed, but in-session requests are configurable.
final boolean managedTransactionsForRequest = transactionManaged ?
true : (Boolean) msg.getArgs().getOrDefault(Tokens.ARGS_MANAGE_TRANSACTION, false);
// we have an empty iterator - happens on stuff like: g.V().iterate()
if (!itty.hasNext()) {
final Map<String, Object> attributes = generateStatusAttributes(sessionTask,ResponseStatusCode.NO_CONTENT, itty);
// as there is nothing left to iterate if we are transaction managed then we should execute a
// commit here before we send back a NO_CONTENT which implies success
if (managedTransactionsForRequest)
closeTransaction(sessionTask, Transaction.Status.COMMIT);
sessionTask.writeAndFlush(ResponseMessage.build(msg)
.code(ResponseStatusCode.NO_CONTENT)
.statusAttributes(attributes)
.create());
return;
}
// the batch size can be overridden by the request
final int resultIterationBatchSize = (Integer) msg.optionalArgs(Tokens.ARGS_BATCH_SIZE)
.orElse(settings.resultIterationBatchSize);
List<Object> aggregate = new ArrayList<>(resultIterationBatchSize);
// use an external control to manage the loop as opposed to just checking hasNext() in the while. this
// prevent situations where auto transactions create a new transaction after calls to commit() withing
// the loop on calls to hasNext().
boolean hasMore = itty.hasNext();
while (hasMore) {
if (Thread.interrupted()) throw new InterruptedException();
// check if an implementation needs to force flush the aggregated results before the iteration batch
// size is reached.
// todo: what implementation does this?! can we kill it going forward - seems always false
// final boolean forceFlush = isForceFlushed(nettyContext, msg, itty);
final boolean forceFlush = false;
// have to check the aggregate size because it is possible that the channel is not writeable (below)
// so iterating next() if the message is not written and flushed would bump the aggregate size beyond
// the expected resultIterationBatchSize. Total serialization time for the response remains in
// effect so if the client is "slow" it may simply timeout.
//
// there is a need to check hasNext() on the iterator because if the channel is not writeable the
// previous pass through the while loop will have next()'d the iterator and if it is "done" then a
// NoSuchElementException will raise its head. also need a check to ensure that this iteration doesn't
// require a forced flush which can be forced by sub-classes.
//
// this could be placed inside the isWriteable() portion of the if-then below but it seems better to
// allow iteration to continue into a batch if that is possible rather than just doing nothing at all
// while waiting for the client to catch up
if (aggregate.size() < resultIterationBatchSize && itty.hasNext() && !forceFlush) aggregate.add(itty.next());
// send back a page of results if batch size is met or if it's the end of the results being iterated.
// also check writeability of the channel to prevent OOME for slow clients.
//
// clients might decide to close the Netty channel to the server with a CloseWebsocketFrame after errors
// like CorruptedFrameException. On the server, although the channel gets closed, there might be some
// executor threads waiting for watermark to clear which will not clear in these cases since client has
// already given up on these requests. This leads to these executors waiting for the client to consume
// results till the timeout. checking for isActive() should help prevent that.
if (nettyContext.channel().isActive() && nettyContext.channel().isWritable()) {
if (forceFlush || aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
final ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
Frame frame = null;
try {
frame = makeFrame(sessionTask, aggregate, code, itty);
} catch (Exception ex) {
// a frame may use a Bytebuf which is a countable release - if it does not get written
// downstream it needs to be released here
if (frame != null) frame.tryRelease();
// exception is handled in makeFrame() - serialization error gets written back to driver
// at that point
if (managedTransactionsForRequest)
closeTransaction(sessionTask, Transaction.Status.ROLLBACK);
break;
}
// track whether there is anything left in the iterator because it needs to be accessed after
// the transaction could be closed - in that case a call to hasNext() could open a new transaction
// unintentionally
final boolean moreInIterator = itty.hasNext();
try {
// only need to reset the aggregation list if there's more stuff to write
if (moreInIterator)
aggregate = new ArrayList<>(resultIterationBatchSize);
else {
// iteration and serialization are both complete which means this finished successfully. note that
// errors internal to script eval or timeout will rollback given GremlinServer's global configurations.
// local errors will get rolledback below because the exceptions aren't thrown in those cases to be
// caught by the GremlinExecutor for global rollback logic. this only needs to be committed if
// there are no more items to iterate and serialization is complete
if (managedTransactionsForRequest)
closeTransaction(sessionTask, Transaction.Status.COMMIT);
// exit the result iteration loop as there are no more results left. using this external control
// because of the above commit. some graphs may open a new transaction on the call to
// hasNext()
hasMore = false;
}
} catch (Exception ex) {
// a frame may use a Bytebuf which is a countable release - if it does not get written
// downstream it needs to be released here
if (frame != null) frame.tryRelease();
throw ex;
}
if (!moreInIterator) iterateComplete(sessionTask, itty);
// the flush is called after the commit has potentially occurred. in this way, if a commit was
// required then it will be 100% complete before the client receives it. the "frame" at this point
// should have completely detached objects from the transaction (i.e. serialization has occurred)
// so a new one should not be opened on the flush down the netty pipeline
sessionTask.writeAndFlush(code, frame);
}
} else {
// don't keep triggering this warning over and over again for the same request
if (!warnOnce) {
logger.warn("Pausing response writing as writeBufferHighWaterMark exceeded on {} - writing will continue once client has caught up", msg);
warnOnce = true;
}
// since the client is lagging we can hold here for a period of time for the client to catch up.
// this isn't blocking the IO thread - just a worker.
TimeUnit.MILLISECONDS.sleep(10);
}
}
}
/**
* If {@link Bytecode} is detected to contain a {@link GraphOp} then it gets processed by this method.
*/
protected void handleGraphOperation(final SessionTask sessionTask, final Bytecode bytecode, final Graph graph) throws Exception {
final RequestMessage msg = sessionTask.getRequestMessage();
if (graph.features().graph().supportsTransactions()) {
if (TX_COMMIT.equals(bytecode) || TX_ROLLBACK.equals(bytecode)) {
final boolean commit = TX_COMMIT.equals(bytecode);
closeTransaction(sessionTask, commit ? Transaction.Status.COMMIT : Transaction.Status.ROLLBACK);
// write back a no-op for success
final Map<String, Object> attributes = generateStatusAttributes(sessionTask,
ResponseStatusCode.NO_CONTENT, Collections.emptyIterator());
sessionTask.writeAndFlush(ResponseMessage.build(msg)
.code(ResponseStatusCode.NO_CONTENT)
.statusAttributes(attributes)
.create());
} else {
throw new IllegalStateException(String.format(
"Bytecode in request is not a recognized graph operation: %s", bytecode.toString()));
}
} else {
throw Graph.Exceptions.transactionsNotSupported();
}
}
/**
* Called when iteration within {@link #handleIterator(SessionTask, Iterator)} is on its final pass and the final
* frame is about to be sent back to the client. This method only gets called on successful iteration of the
* entire result.
*/
protected void iterateComplete(final SessionTask sessionTask, final Iterator<?> itty) {
// do nothing by default
}
/**
* Generates response status meta-data to put on a {@link ResponseMessage}.
*
* @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
* this method
*/
protected Map<String, Object> generateStatusAttributes(final SessionTask sessionTask,
final ResponseStatusCode code, final Iterator<?> itty) {
// only return server metadata on the last message
if (itty.hasNext()) return Collections.emptyMap();
final Map<String, Object> metaData = new HashMap<>();
metaData.put(Tokens.ARGS_HOST, sessionTask.getChannelHandlerContext().channel().remoteAddress().toString());
return metaData;
}
/**
* Generates response result meta-data to put on a {@link ResponseMessage}.
*
* @param itty a reference to the current {@link Iterator} of results - it is not meant to be forwarded in
* this method
*/
protected Map<String, Object> generateResponseMetaData(final SessionTask sessionTask,
final ResponseStatusCode code, final Iterator<?> itty) {
return Collections.emptyMap();
}
protected Frame makeFrame(final SessionTask sessionTask, final List<Object> aggregate,
final ResponseStatusCode code, final Iterator<?> itty) throws Exception {
final RequestMessage msg = sessionTask.getRequestMessage();
final ChannelHandlerContext nettyContext = sessionTask.getChannelHandlerContext();
final MessageSerializer serializer = nettyContext.channel().attr(StateKey.SERIALIZER).get();
final boolean useBinary = nettyContext.channel().attr(StateKey.USE_BINARY).get();
final Map<String, Object> responseMetaData = generateResponseMetaData(sessionTask, code, itty);
final Map<String, Object> statusAttributes = generateStatusAttributes(sessionTask, code, itty);
try {
if (useBinary) {
return new Frame(serializer.serializeResponseAsBinary(ResponseMessage.build(msg)
.code(code)
.statusAttributes(statusAttributes)
.responseMetaData(responseMetaData)
.result(aggregate).create(), nettyContext.alloc()));
} else {
// the expectation is that the GremlinTextRequestDecoder will have placed a MessageTextSerializer
// instance on the channel.
final MessageTextSerializer textSerializer = (MessageTextSerializer) serializer;
return new Frame(textSerializer.serializeResponseAsString(ResponseMessage.build(msg)
.code(code)
.statusAttributes(statusAttributes)
.responseMetaData(responseMetaData)
.result(aggregate).create(), nettyContext.alloc()));
}
} catch (Exception ex) {
logger.warn("The result [{}] in the request {} could not be serialized and returned.", aggregate, msg.getRequestId(), ex);
final String errorMessage = String.format("Error during serialization: %s", ExceptionHelper.getMessageFromExceptionOrCause(ex));
final ResponseMessage error = ResponseMessage.build(msg.getRequestId())
.statusMessage(errorMessage)
.statusAttributeException(ex)
.code(ResponseStatusCode.SERVER_ERROR_SERIALIZATION).create();
sessionTask.writeAndFlush(error);
throw ex;
}
}
/**
* Called right before a transaction starts within {@link #run()}. The default implementation checks for open
* transactions and throws an exception if it finds any and generally assumes auto-transactions are enabled on
* graphs (i.e. transaction automatically start on read/write).
* <p/>
* Providers who do not follow these sorts of transaction semantics should provide an override to this method.
*/
protected void startTransaction(final SessionTask sessionTask) {
if (graphManager.hasAnyOpenTransactions()) {
// logger and handling in calling method
throw new IllegalStateException(String.format(
"Attempted to start transaction for %s but the transaction was already open",
sessionTask.getRequestMessage().getRequestId()));
}
}
/**
* Close the transaction without a {@link SessionTask} which supplies {@code null} to that argument for
* {@link #closeTransaction(SessionTask, Transaction.Status)}. This method is idempotent.
*/
protected void closeTransaction(final Transaction.Status status) {
closeTransaction(null, status);
}
/**
* Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
*/
protected void closeTransactionSafely(final Transaction.Status status) {
closeTransactionSafely(null, status);
}
/**
* Tries to close the transaction but will catch exceptions and log them. This method is idempotent.
*/
protected void closeTransactionSafely(final SessionTask sessionTask, final Transaction.Status status) {
try {
closeTransaction(sessionTask, status);
} catch (Exception ex) {
logger.error("Failed to close transaction", ex);
}
}
private void processAuditLog(final Settings settings, final ChannelHandlerContext ctx, final Object gremlinToExecute) {
if (settings.enableAuditLog) {
AuthenticatedUser user = ctx.channel().attr(StateKey.AUTHENTICATED_USER).get();
if (null == user) { // This is expected when using the AllowAllAuthenticator
user = AuthenticatedUser.ANONYMOUS_USER;
}
String address = ctx.channel().remoteAddress().toString();
if (address.startsWith("/") && address.length() > 1) address = address.substring(1);
auditLogger.info("User {} with address {} requested: {}", user.getName(), address, gremlinToExecute);
}
}
/**
* Closes a transaction with commit or rollback. Strict transaction management settings are observed when
* configured as such in {@link Settings#strictTransactionManagement} and when aliases are present on the
* request in the current {@link SessionTask}. If the supplied {@link SessionTask} is {@code null} then "strict" is
* bypassed so this form must be called with care. Bypassing is often useful to ensure that all transactions
* are cleaned up when multiple graphs are referenced. Prefer calling {@link #closeTransaction(Transaction.Status)}
* in this case instead. This method is idempotent.
*/
protected void closeTransaction(final SessionTask sessionTask, final Transaction.Status status) {
if (status != Transaction.Status.COMMIT && status != Transaction.Status.ROLLBACK)
throw new IllegalStateException(String.format("Transaction.Status not supported: %s", status));
final boolean commit = status == Transaction.Status.COMMIT;
final boolean strict = sessionTask != null && sessionTask.getSettings().strictTransactionManagement;
if (strict) {
if (commit)
graphManager.commit(new HashSet<>(aliasesUsedBySession));
else
graphManager.rollback(new HashSet<>(aliasesUsedBySession));
} else {
if (commit)
graphManager.commitAll();
else
graphManager.rollbackAll();
}
}
private Timer.Context getMetricsTimer(final SessionTask sessionTask) {
// getting something other than bytecode or script at this point is unlikely as earlier validations
// should have picked this up.
switch (sessionTask.getRequestContentType()) {
case BYTECODE:
return Session.traversalOpTimer.time();
case SCRIPT:
return Session.evalOpTimer.time();
default:
throw new IllegalStateException("Unrecognized content of the 'gremlin' argument in the request");
}
}
}