blob: afdae1556d42f1dc3b4498491565f2aadea7f148 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.server.handler;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GroovyCompilerGremlinPlugin;
import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngine;
import org.apache.tinkerpop.gremlin.jsr223.GremlinScriptEngineManager;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Bindings;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tinkerpop.gremlin.server.op.session.SessionOpProcessor.CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED;
/**
* A {@link Session} implementation that queues tasks given to it and executes them in a serial fashion within the
* same thread which thus allows multiple tasks to be executed in the same transaction. The first {@link SessionTask}
* to execute is supplied on the constructor and additional ones may be added as they arrive with
* {@link #submitTask(SessionTask)} where they will be added to a queue where they will await execution in the thread
* bound to this session.
*/
public class MultiTaskSession extends AbstractSession {
private static final Logger logger = LoggerFactory.getLogger(MultiTaskSession.class);
protected final BlockingQueue<SessionTask> queue;
private final AtomicBoolean ending = new AtomicBoolean(false);
private final ScheduledExecutorService scheduledExecutorService;
private final GremlinScriptEngineManager scriptEngineManager;
private ScheduledFuture<?> requestCancelFuture;
private Bindings bindings;
/**
* Creates a new {@code MultiTaskSession} object providing the initial starting {@link SessionTask} that gets
* executed by the session when it starts.
*
* @param initialSessionTask The tasks that starts the session.
* @param sessionId The id of the session
* @param sessions The session id to {@link Session} instances mapping
*/
public MultiTaskSession(final SessionTask initialSessionTask, final String sessionId,
final ConcurrentMap<String, Session> sessions) {
super(initialSessionTask, sessionId, false, sessions);
queue = new LinkedBlockingQueue<>(initialSessionTask.getSettings().maxSessionTaskQueueSize);
// using a global function cache is cheaper than creating a new on per session especially if you have to
// create a lot of sessions. it will generate a ton of throw-away objects. mostly keeping the option open
// to not use it to preserve the ability to use the old functionality if wanted or if there is some specific
// use case with sessions that needs it. if we wanted this could eventually become a per-request option
// so that the client could control it as necessary and get scriptengine isolation if they need it.
if (initialSessionTask.getSettings().useCommonEngineForSessions)
scriptEngineManager = initialSessionTask.getGremlinExecutor().getScriptEngineManager();
else
scriptEngineManager = initializeGremlinExecutor(initialSessionTask).getScriptEngineManager();
scheduledExecutorService = initialSessionTask.getScheduledExecutorService();
// would be odd if this tossed an exception because on construction the queue should be empty,
// but let's handle it in the same fashion as if it had been rejected by the queue itself but put up
// an additional log because something would be rather off here if we hit this condition
if (!submitTask(initialSessionTask)) {
logger.error("Task {} rejected on creation of the {} for session {}",
initialSessionTask.getRequestMessage().getRequestId(), this.getClass().getSimpleName(),
getSessionId());
final String msg = String.format("Task %s rejected from session %s",
initialSessionTask.getRequestMessage().getRequestId(), getSessionId());
throw new RejectedExecutionException(msg);
}
}
/**
* Gets the script engine specific to this session which is dependent on the
* {@link Settings#useCommonEngineForSessions} configuration.
*/
@Override
public GremlinScriptEngine getScriptEngine(final SessionTask sessionTask, final String language) {
return scriptEngineManager.getEngineByName(language);
}
@Override
public boolean isAcceptingTasks() {
return !ending.get();
}
@Override
public boolean submitTask(final SessionTask sessionTask) throws RejectedExecutionException {
try {
return isAcceptingTasks() && queue.add(sessionTask);
} catch (IllegalStateException ise) {
final String msg = String.format("Task %s rejected from session %s",
sessionTask.getRequestMessage().getRequestId(), getSessionId());
throw new RejectedExecutionException(msg);
}
}
@Override
public void run() {
// allow the Session to know about the thread that is running it which will be a thread from the gremlinPool
// the thread really only has relevance once the session has started. this thread is then held below in the
// while() loop awaiting items to arrive on the request queue. it will hold until the session is properly
// closed by the client or through interruption/error
this.sessionThread = Thread.currentThread();
// there must be one item in the queue at least since addTask() gets called before the worker
// is ever started
SessionTask sessionTask = queue.poll();
if (null == sessionTask)
throw new IllegalStateException(String.format("Worker has no initial context for session: %s", getSessionId()));
try {
startTransaction(sessionTask);
try {
while (true) {
// schedule timeout for the current request from the queue
final long seto = sessionTask.getRequestTimeout();
requestCancelFuture = scheduledExecutorService.schedule(
() -> this.triggerTimeout(seto, false),
seto, TimeUnit.MILLISECONDS);
// only stop processing stuff in the queue if this session isn't configured to hold state between
// exceptions (i.e. the old OpProcessor way) or if this session is closing down by certain death
// (i.e. channel close or lifetime session timeout)
try {
process(sessionTask);
} catch (SessionException ex) {
if (!maintainStateAfterException || closeReason.get() == CloseReason.CHANNEL_CLOSED ||
closeReason.get() == CloseReason.SESSION_TIMEOUT) {
throw ex;
}
// reset the close reason as we are maintaining state
closeReason.set(null);
logger.warn(ex.getMessage(), ex);
sessionTask.writeAndFlush(ex.getResponseMessage());
}
// work is done within the timeout period so cancel it
cancelRequestTimeout();
sessionTask = queue.take();
}
} catch (Exception ex) {
stopAcceptingRequests();
// the current context gets its exception handled...
handleException(sessionTask, ex);
}
} catch (SessionException rexex) {
// if the close reason isn't already set then things stopped during gremlin execution somewhere and not
// more external issues like channel close or timeouts.
closeReason.compareAndSet(null, CloseReason.PROCESSING_EXCEPTION);
// remaining work items in the queue are ignored since this worker is closing. must send
// back some sort of response to satisfy the client. writeAndFlush code is different than
// the ResponseMessage as we don't want the message to be "final" for the Context. that
// status must be reserved for the message that caused the error
for (SessionTask st : queue) {
st.write(ResponseStatusCode.PARTIAL_CONTENT, ResponseMessage.build(st.getRequestMessage())
.code(ResponseStatusCode.SERVER_ERROR)
.statusMessage(String.format(
"An earlier request [%s] failed prior to this one having a chance to execute",
sessionTask.getRequestMessage().getRequestId())).create());
}
// exception should trigger a rollback in the session. a more focused rollback may have occurred
// during process() and the related result iteration IF transaction management was enabled on
// the request
closeTransactionSafely(Transaction.Status.ROLLBACK);
// the current context could already be completed with SUCCESS and we're just waiting for another
// one to show up while a timeout occurs or the channel closes. in these cases, this would be a valid
// close in all likelihood so there's no reason to log or alert the client as the client already has
// the best answer
if (!sessionTask.isFinalResponseWritten()) {
logger.warn(rexex.getMessage(), rexex);
sessionTask.write(rexex.getResponseMessage());
}
} finally {
// if this is a normal end to the session or if there is some general processing exception which tanks
// the entire session or if the session life timeout is exceeded then the session needs to be removed
// and everything cleaned up
if (closeReason.compareAndSet(null, CloseReason.EXIT_PROCESSING) ||
closeReason.get() == CloseReason.PROCESSING_EXCEPTION ||
closeReason.get() == CloseReason.SESSION_TIMEOUT) {
close();
// the session is now in a state where it is no longer in the set of current sessions so flush
// remaining messages to the transport, if any. the remaining message should be failures from
// the SessionException in the catch. this prevents an unlikely case where a fast client can
// get ahead of the server and start to send messages to a technically errored out session. that
// in itself is not necessarily bad but it makes tests fail sometimes because the tests are
// designed to assert in the same fashion for OpProcessor and UnifiedChannelizer infrastructure
// and they are slightly at odds with each other in certain situations.
sessionTask.flush();
}
}
}
/**
* This method stops incoming requests from being added to the session queue. It then cancels the session lifetime
* timeout and then calls {@link super#close()} which will then interrupt this {@link #run()} which will clear out
* remaining requests in the queue. This latter part of the close, where the interruption is concerned, is an
* asynchronous operation and will not block as it finishes its processing in the "gremlinPool" thread that was
* originally handling the work.
*/
@Override
public void close() {
stopAcceptingRequests();
cancelRequestTimeout();
super.close();
logger.debug("Session {} closed", getSessionId());
}
private void cancelRequestTimeout() {
if (requestCancelFuture != null && !requestCancelFuture.isDone())
requestCancelFuture.cancel(true);
else
logger.debug("Could not cancel request timeout for {} - {}", getSessionId(), requestCancelFuture);
}
private void stopAcceptingRequests() {
if (ending.compareAndSet(false, true))
cancel(true);
}
@Override
protected Bindings getWorkerBindings() throws SessionException {
if (null == bindings)
bindings = super.getWorkerBindings();
return this.bindings;
}
protected GremlinExecutor initializeGremlinExecutor(final SessionTask sessionTask) {
final Settings settings = sessionTask.getSettings();
final ExecutorService executor = sessionTask.getGremlinExecutor().getExecutorService();
final boolean useGlobalFunctionCache = settings.useGlobalFunctionCacheForSessions;
// these initial settings don't matter so much as we don't really execute things through the
// GremlinExecutor directly. Just doing all this setup to make GremlinExecutor do the work of
// rigging up the GremlinScriptEngineManager.
final GremlinExecutor.Builder gremlinExecutorBuilder = GremlinExecutor.build()
.evaluationTimeout(settings.getEvaluationTimeout())
.executorService(executor)
.globalBindings(graphManager.getAsBindings())
.scheduledExecutorService(scheduledExecutorService);
settings.scriptEngines.forEach((k, v) -> {
// use plugins if they are present
if (!v.plugins.isEmpty()) {
// make sure that server related classes are available at init. the LifeCycleHook stuff will be
// added explicitly via configuration using GremlinServerGremlinModule in the yaml. need to override
// scriptengine settings with SessionOpProcessor specific ones as the processing for sessions is
// different and a global setting may not make sense for a session
if (v.plugins.containsKey(GroovyCompilerGremlinPlugin.class.getName())) {
v.plugins.get(GroovyCompilerGremlinPlugin.class.getName()).put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
} else {
final Map<String,Object> pluginConf = new HashMap<>();
pluginConf.put(CONFIG_GLOBAL_FUNCTION_CACHE_ENABLED, useGlobalFunctionCache);
v.plugins.put(GroovyCompilerGremlinPlugin.class.getName(), pluginConf);
}
gremlinExecutorBuilder.addPlugins(k, v.plugins);
}
});
return gremlinExecutorBuilder.create();
}
@Override
public String toString() {
return String.format("%s - session: %s", MultiTaskSession.class.getSimpleName(), getSessionId());
}
}