blob: e7485a4481c42e3b8670346552624d1def72f32a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.scripting;
import org.apache.giraph.conf.JsonStringConfOption;
import org.apache.giraph.graph.Language;
import org.apache.giraph.jython.JythonUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.codehaus.jackson.type.TypeReference;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile;
/**
* Loads scripts written by user in other languages, for example Jython.
*/
public class ScriptLoader {
/** Option for scripts to load on workers */
public static final JsonStringConfOption SCRIPTS_TO_LOAD =
new JsonStringConfOption("giraph.scripts.to.load",
"Scripts to load on workers");
/** Scripts that were loaded */
private static final List<DeployedScript> LOADED_SCRIPTS =
Lists.newArrayList();
/** Logger */
private static final Logger LOG = Logger.getLogger(ScriptLoader.class);
/** Don't construct */
private ScriptLoader() { }
/**
* Deploy a script
*
* @param conf {@link Configuration}
* @param scriptPath Path to script
* @param deployType type of deployment
* @param language programming language
*/
public static void setScriptsToLoad(Configuration conf,
String scriptPath, DeployType deployType, Language language) {
DeployedScript deployedScript = new DeployedScript(scriptPath,
deployType, language);
setScriptsToLoad(conf, deployedScript);
}
/**
* Deploy pair of scripts
*
* @param conf {@link Configuration}
* @param script1 Path to script
* @param deployType1 type of deployment
* @param language1 programming language
* @param script2 Path to script
* @param deployType2 type of deployment
* @param language2 programming language
*/
public static void setScriptsToLoad(Configuration conf,
String script1, DeployType deployType1, Language language1,
String script2, DeployType deployType2, Language language2) {
DeployedScript deployedScript1 = new DeployedScript(script1,
deployType1, language1);
DeployedScript deployedScript2 = new DeployedScript(script2,
deployType2, language2);
setScriptsToLoad(conf, deployedScript1, deployedScript2);
}
/**
* Deploy scripts
*
* @param conf Configuration
* @param scripts the scripts to deploy
*/
public static void setScriptsToLoad(Configuration conf,
DeployedScript... scripts) {
List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts);
SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
}
/**
* Add a script to load on workers
*
* @param conf {@link Configuration}
* @param script Path to script
* @param deployType type of deployment
* @param language programming language
*/
public static void addScriptToLoad(Configuration conf,
String script, DeployType deployType, Language language) {
addScriptToLoad(conf, new DeployedScript(script, deployType, language));
}
/**
* Add a script to load on workers
*
* @param conf {@link Configuration}
* @param script the script to load
*/
public static void addScriptToLoad(Configuration conf,
DeployedScript script) {
List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf);
if (scriptsToLoad == null) {
scriptsToLoad = Lists.<DeployedScript>newArrayList();
}
scriptsToLoad.add(script);
SCRIPTS_TO_LOAD.set(conf, scriptsToLoad);
}
/**
* Get the list of scripts to load on workers
*
* @param conf {@link Configuration}
* @return list of {@link DeployedScript}s
*/
public static List<DeployedScript> getScriptsToLoad(Configuration conf) {
TypeReference<List<DeployedScript>> jsonType =
new TypeReference<List<DeployedScript>>() { };
return SCRIPTS_TO_LOAD.get(conf, jsonType);
}
/**
* Load all the scripts deployed in Configuration
*
* @param conf Configuration
* @throws IOException
*/
public static void loadScripts(Configuration conf) throws IOException {
List<DeployedScript> deployedScripts = getScriptsToLoad(conf);
if (deployedScripts == null) {
return;
}
for (DeployedScript deployedScript : deployedScripts) {
loadScript(conf, deployedScript);
}
}
/**
* Load a single deployed script
*
* @param conf Configuration
* @param deployedScript the deployed script
* @throws IOException
*/
public static void loadScript(Configuration conf,
DeployedScript deployedScript) throws IOException {
InputStream stream = openScriptInputStream(conf, deployedScript);
switch (deployedScript.getLanguage()) {
case JYTHON:
loadJythonScript(stream);
break;
default:
LOG.fatal("Don't know how to load script " + deployedScript);
throw new IllegalStateException("Don't know how to load script " +
deployedScript);
}
LOADED_SCRIPTS.add(deployedScript);
Closeables.close(stream, true);
}
/**
* Load a Jython deployed script
*
* @param stream InputStream with Jython code to load
*/
private static void loadJythonScript(InputStream stream) {
JythonUtils.getInterpreter().execfile(stream);
}
/**
* Get list of scripts already loaded.
*
* @return list of loaded scripts
*/
public static List<DeployedScript> getLoadedScripts() {
return LOADED_SCRIPTS;
}
/**
* Get an {@link java.io.InputStream} for the deployed script.
*
* @param conf Configuration
* @param deployedScript the deployed script
* @return {@link java.io.InputStream} for reading script
*/
private static InputStream openScriptInputStream(Configuration conf,
DeployedScript deployedScript) {
DeployType deployType = deployedScript.getDeployType();
String path = deployedScript.getPath();
InputStream stream;
switch (deployType) {
case RESOURCE:
if (LOG.isInfoEnabled()) {
LOG.info("getScriptStream: Reading script from resource at " +
deployedScript.getPath());
}
stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path);
if (stream == null) {
throw new IllegalStateException("getScriptStream: Failed to " +
"open script from resource at " + path);
}
break;
case DISTRIBUTED_CACHE:
if (LOG.isInfoEnabled()) {
LOG.info("getScriptStream: Reading script from DistributedCache at " +
path);
}
Optional<Path> localPath = getLocalCacheFile(conf, path);
if (!localPath.isPresent()) {
throw new IllegalStateException("getScriptStream: Failed to " +
"find script in local DistributedCache matching " + path);
}
String pathStr = localPath.get().toString();
try {
stream = new BufferedInputStream(new FileInputStream(pathStr));
} catch (IOException e) {
throw new IllegalStateException("getScriptStream: Failed open " +
"script from DistributedCache at " + localPath);
}
break;
default:
throw new IllegalArgumentException("getScriptStream: Unknown " +
"script deployment type: " + deployType);
}
return stream;
}
}