blob: 1113eb57b0496a1fdbc2100e9be1c6340ca47788 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.groovy.engine;
import org.apache.tinkerpop.gremlin.groovy.CompilerCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.DefaultImportCustomizerProvider;
import org.apache.tinkerpop.gremlin.groovy.jsr223.DependencyManager;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngine;
import org.apache.tinkerpop.gremlin.groovy.jsr223.GremlinGroovyScriptEngineFactory;
import org.apache.tinkerpop.gremlin.groovy.plugin.GremlinPlugin;
import org.apache.tinkerpop.gremlin.groovy.plugin.IllegalEnvironmentException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.Bindings;
import javax.script.Compilable;
import javax.script.CompiledScript;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.Reader;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Holds a batch of the configured {@code ScriptEngine} objects for the server.
*
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public class ScriptEngines implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ScriptEngines.class);
private final static ScriptEngineManager SCRIPT_ENGINE_MANAGER = new ScriptEngineManager();
private static final GremlinGroovyScriptEngineFactory gremlinGroovyScriptEngineFactory = new GremlinGroovyScriptEngineFactory();
/**
* {@code ScriptEngine} objects configured for the server keyed on the language name.
*/
private final Map<String, ScriptEngine> scriptEngines = new ConcurrentHashMap<>();
private final AtomicBoolean controlOperationExecuting = new AtomicBoolean(false);
private final Queue<Thread> controlWaiters = new ConcurrentLinkedQueue<>();
private final Queue<Thread> evalWaiters = new ConcurrentLinkedQueue<>();
private final Consumer<ScriptEngines> initializer;
/**
* Creates a new object.
*
* @param initializer allows for external initialization of the newly created {@code ScriptEngines} object
*/
public ScriptEngines(final Consumer<ScriptEngines> initializer) {
this.initializer = initializer;
this.initializer.accept(this);
}
/**
* Evaluate a script with {@code Bindings} for a particular language.
*/
public Object eval(final String script, final Bindings bindings, final String language) throws ScriptException {
if (!scriptEngines.containsKey(language))
throw new IllegalArgumentException(String.format("Language [%s] not supported", language));
awaitControlOp();
final ScriptEngine engine = scriptEngines.get(language);
final Bindings all = mergeBindings(bindings, engine);
return engine.eval(script, all);
}
/**
* Evaluate a script with {@code Bindings} for a particular language.
*/
public Object eval(final Reader reader, final Bindings bindings, final String language)
throws ScriptException {
if (!scriptEngines.containsKey(language))
throw new IllegalArgumentException("Language [%s] not supported");
awaitControlOp();
final ScriptEngine engine = scriptEngines.get(language);
final Bindings all = mergeBindings(bindings, engine);
return engine.eval(reader, all);
}
/**
* Compiles a script without executing it.
*
* @throws UnsupportedOperationException if the {@link ScriptEngine} implementation does not implement
* the {@link javax.script.Compilable} interface.
*/
public CompiledScript compile(final String script, final String language) throws ScriptException {
if (!scriptEngines.containsKey(language))
throw new IllegalArgumentException("Language [%s] not supported");
awaitControlOp();
final ScriptEngine scriptEngine = scriptEngines.get(language);
if (!Compilable.class.isAssignableFrom(scriptEngine.getClass()))
throw new UnsupportedOperationException(String.format("ScriptEngine for %s does not implement %s", language, Compilable.class.getName()));
final Compilable compilable = (Compilable) scriptEngine;
return compilable.compile(script);
}
/**
* Compiles a script without executing it.
*
* @throws UnsupportedOperationException if the {@link ScriptEngine} implementation does not implement
* the {@link javax.script.Compilable} interface.
*/
public CompiledScript compile(final Reader script, final String language) throws ScriptException {
if (!scriptEngines.containsKey(language))
throw new IllegalArgumentException("Language [%s] not supported");
awaitControlOp();
final ScriptEngine scriptEngine = scriptEngines.get(language);
if (scriptEngine instanceof Compilable)
throw new UnsupportedOperationException(String.format("ScriptEngine for %s does not implement %s", language, Compilable.class.getName()));
final Compilable compilable = (Compilable) scriptEngine;
return compilable.compile(script);
}
/**
* Reload a {@code ScriptEngine} with fresh imports. Waits for any existing script evaluations to complete but
* then blocks other operations until complete.
*/
public void reload(final String language, final Set<String> imports, final Set<String> staticImports,
final Map<String, Object> config) {
try {
signalControlOpStart();
if (scriptEngines.containsKey(language))
scriptEngines.remove(language);
final ScriptEngine scriptEngine = createScriptEngine(language, imports, staticImports, config)
.orElseThrow(() -> new IllegalArgumentException("Language [%s] not supported"));
scriptEngines.put(language, scriptEngine);
logger.info("Loaded {} ScriptEngine", language);
} finally {
signalControlOpEnd();
}
}
/**
* Perform append to the existing import list for all {@code ScriptEngine} instances that implement the
* {@link DependencyManager} interface. Waits for any existing script evaluations to complete but
* then blocks other operations until complete.
*/
public void addImports(final Set<String> imports) {
try {
signalControlOpStart();
getDependencyManagers().forEach(dm -> dm.addImports(imports));
} finally {
signalControlOpEnd();
}
}
/**
* Pull in dependencies given some Maven coordinates. Cycle through each {@code ScriptEngine} and determine if it
* implements {@link DependencyManager}. For those that do call the @{link DependencyManager#use} method to fire
* it up. Waits for any existing script evaluations to complete but then blocks other operations until complete.
*/
public List<GremlinPlugin> use(final String group, final String artifact, final String version) {
final List<GremlinPlugin> pluginsToLoad = new ArrayList<>();
try {
signalControlOpStart();
getDependencyManagers().forEach(dm -> {
try {
pluginsToLoad.addAll(dm.use(group, artifact, version));
} catch (Exception ex) {
logger.warn("Could not get dependency for [{}, {}, {}] - {}", group, artifact, version, ex.getMessage());
}
});
} finally {
signalControlOpEnd();
}
return pluginsToLoad;
}
/**
* For each {@link DependencyManager} try to load the specified plugins.
*/
public void loadPlugins(final List<GremlinPlugin> plugins) {
try {
signalControlOpStart();
getDependencyManagers().forEach(dm -> {
try {
dm.loadPlugins(plugins);
} catch (IllegalEnvironmentException iee) {
logger.warn("Some plugins may not have been loaded to {} - {}", dm.getClass().getSimpleName(), iee.getMessage());
} catch (Exception ex) {
logger.error(String.format("Some plugins may not have been loaded to %s", dm.getClass().getSimpleName()), ex);
}
});
} finally {
signalControlOpEnd();
}
}
/**
* Iterate through all the {@link ScriptEngine} implementations and if they implement {@link AutoCloseable}
* then call the {@link AutoCloseable#close()} method. After that is complete, the script engine cache will be
* cleared.
*/
@Override
public void close() throws Exception {
try {
signalControlOpStart();
scriptEngines.values().stream()
.filter(se -> se instanceof AutoCloseable)
.map(se -> (AutoCloseable) se).forEach(c -> {
try {
c.close();
} catch (Exception ignored) {
}
});
scriptEngines.clear();
} finally {
signalControlOpEnd();
}
}
/**
* Resets the ScriptEngines and re-initializes them. Waits for any existing script evaluations to complete but
* then blocks other operations until complete.
*/
public void reset() {
try {
signalControlOpStart();
getDependencyManagers().forEach(DependencyManager::reset);
} finally {
signalControlOpEnd();
this.initializer.accept(this);
}
}
/**
* List dependencies for those {@code ScriptEngine} objects that implement the {@link DependencyManager} interface.
*/
public Map<String, List<Map>> dependencies() {
final Map<String, List<Map>> m = new HashMap<>();
scriptEngines.entrySet().stream()
.filter(kv -> kv.getValue() instanceof DependencyManager)
.forEach(kv -> m.put(kv.getKey(), Arrays.asList(((DependencyManager) kv.getValue()).dependencies())));
return m;
}
/**
* List the imports for those {@code ScriptEngine} objects that implement the {@link DependencyManager} interface.
*/
public Map<String, List<Map>> imports() {
final Map<String, List<Map>> m = new HashMap<>();
scriptEngines.entrySet().stream()
.filter(kv -> kv.getValue() instanceof DependencyManager)
.forEach(kv -> m.put(kv.getKey(), Arrays.asList(((DependencyManager) kv.getValue()).imports())));
return m;
}
/**
* Get the set of {@code ScriptEngine} that implement {@link DependencyManager} interface.
*/
private Set<DependencyManager> getDependencyManagers() {
return scriptEngines.entrySet().stream()
.map(Map.Entry::getValue)
.filter(se -> se instanceof DependencyManager)
.map(se -> (DependencyManager) se)
.collect(Collectors.<DependencyManager>toSet());
}
/**
* Called when a control operation starts. Attempts to grab a lock on the "control" process and parks the
* current thread if it cannot. Parked threads are queued until released by {@link #signalControlOpEnd()}.
*/
private void signalControlOpStart() {
boolean wasInterrupted = false;
final Thread current = Thread.currentThread();
controlWaiters.add(current);
// Block while not first in queue or cannot acquire lock
while (controlWaiters.peek() != current || !controlOperationExecuting.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted())
wasInterrupted = true;
}
controlWaiters.remove();
if (wasInterrupted)
current.interrupt();
}
/**
* Called when a control operation is finished. Releases the lock for the next thread trying to call a
* control operation. If there are no additional threads trying to call control operations then unpark
* any evaluation threads that are waiting.
*/
private void signalControlOpEnd() {
controlOperationExecuting.set(false);
LockSupport.unpark(controlWaiters.peek());
// let the eval threads proceed as long as the control functions waiting are all complete
if (controlWaiters.size() == 0) {
Thread t = evalWaiters.poll();
while (t != null) {
LockSupport.unpark(t);
t = evalWaiters.poll();
}
}
}
/**
* If a control operation is executing or there are some in the queue to be executed, then block the current
* thread until that process completes.
*/
private void awaitControlOp() {
if(controlWaiters.size() > 0 || controlOperationExecuting.get()) {
evalWaiters.add(Thread.currentThread());
LockSupport.park(this);
}
}
private static synchronized Optional<ScriptEngine> createScriptEngine(final String language,
final Set<String> imports,
final Set<String> staticImports,
final Map<String, Object> config) {
// gremlin-groovy gets special initialization for mapper imports and such. could implement this more
// generically with the DependencyManager interface, but going to wait to see how other ScriptEngines
// develop for TinkerPop3 before committing too deeply here to any specific way of doing this.
if (language.equals(gremlinGroovyScriptEngineFactory.getLanguageName())) {
final List<CompilerCustomizerProvider> providers = new ArrayList<>();
providers.add(new DefaultImportCustomizerProvider(imports, staticImports));
// the key to the config of the compilerCustomizerProvider is the fully qualified classname of a
// CompilerCustomizerProvider. the value is a list of arguments to pass to an available constructor.
// the arguments must match in terms of type, so given that configuration typically comes from yaml
// or properties file, it is best to stick to primitive values when possible here for simplicity.
final Map<String,Object> compilerCustomizerProviders = (Map<String,Object>) config.getOrDefault(
"compilerCustomizerProviders", Collections.emptyMap());
compilerCustomizerProviders.forEach((k,v) -> {
try {
final Class providerClass = Class.forName(k);
if (v != null && v instanceof List && ((List) v).size() > 0) {
final List<Object> l = (List) v;
final Object[] args = new Object[l.size()];
l.toArray(args);
final Class<?>[] argClasses = new Class<?>[args.length];
Stream.of(args).map(a -> a.getClass()).collect(Collectors.toList()).toArray(argClasses);
final Constructor constructor = providerClass.getConstructor(argClasses);
providers.add((CompilerCustomizerProvider) constructor.newInstance(args));
} else {
providers.add((CompilerCustomizerProvider) providerClass.newInstance());
}
} catch(Exception ex) {
logger.warn(String.format("Could not instantiate CompilerCustomizerProvider implementation [%s]. It will not be applied.", k), ex);
}
});
final CompilerCustomizerProvider[] providerArray = new CompilerCustomizerProvider[providers.size()];
return Optional.of((ScriptEngine) new GremlinGroovyScriptEngine(providers.toArray(providerArray)));
} else {
return Optional.ofNullable(SCRIPT_ENGINE_MANAGER.getEngineByName(language));
}
}
/**
* Takes the bindings from a request for eval and merges them with the {@code ENGINE_SCOPE} bindings.
*/
private static Bindings mergeBindings(final Bindings bindings, final ScriptEngine engine) {
// plugins place "globals" here - see ScriptEnginePluginAcceptor
final Bindings global = engine.getBindings(ScriptContext.GLOBAL_SCOPE);
if (null == global) return bindings;
// merge the globals with the incoming bindings where local bindings "win"
final Bindings all = new SimpleBindings(global);
all.putAll(bindings);
return all;
}
}