blob: adf3ea75424928504bf2f54e5d98ab4f721bf739 [file] [log] [blame]
/*
* Copyright (C) 2010-2012 The University of Manchester
*
* See the file "LICENSE" for license terms.
*/
package org.taverna.server.localworker.impl;
import static java.lang.Runtime.getRuntime;
import static java.lang.System.getProperty;
import static java.lang.System.out;
import static java.lang.management.ManagementFactory.getRuntimeMXBean;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.apache.commons.io.FileUtils.forceDelete;
import static org.apache.commons.io.FileUtils.forceMkdir;
import static org.apache.commons.io.FileUtils.writeByteArrayToFile;
import static org.apache.commons.io.FileUtils.writeLines;
import static org.taverna.server.localworker.api.Constants.HELIO_TOKEN_NAME;
import static org.taverna.server.localworker.api.Constants.KEYSTORE_FILE;
import static org.taverna.server.localworker.api.Constants.KEYSTORE_PASSWORD;
import static org.taverna.server.localworker.api.Constants.SECURITY_DIR_NAME;
import static org.taverna.server.localworker.api.Constants.SHARED_DIR_PROP;
import static org.taverna.server.localworker.api.Constants.SUBDIR_LIST;
import static org.taverna.server.localworker.api.Constants.SYSTEM_ENCODING;
import static org.taverna.server.localworker.api.Constants.TRUSTSTORE_FILE;
import static org.taverna.server.localworker.impl.utils.FilenameVerifier.getValidatedFile;
import static org.taverna.server.localworker.remote.RemoteStatus.Finished;
import static org.taverna.server.localworker.remote.RemoteStatus.Initialized;
import static org.taverna.server.localworker.remote.RemoteStatus.Operating;
import static org.taverna.server.localworker.remote.RemoteStatus.Stopped;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.rmi.RemoteException;
import java.rmi.server.UnicastRemoteObject;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import org.taverna.server.localworker.api.Worker;
import org.taverna.server.localworker.api.WorkerFactory;
import org.taverna.server.localworker.remote.IllegalStateTransitionException;
import org.taverna.server.localworker.remote.ImplementationException;
import org.taverna.server.localworker.remote.RemoteDirectory;
import org.taverna.server.localworker.remote.RemoteInput;
import org.taverna.server.localworker.remote.RemoteListener;
import org.taverna.server.localworker.remote.RemoteSecurityContext;
import org.taverna.server.localworker.remote.RemoteSingleRun;
import org.taverna.server.localworker.remote.RemoteStatus;
import org.taverna.server.localworker.remote.StillWorkingOnItException;
import org.taverna.server.localworker.server.UsageRecordReceiver;
/**
* This class implements one side of the connection between the Taverna Server
* master server and this process. It delegates to a {@link Worker} instance the
* handling of actually running a workflow.
*
* @author Donal Fellows
* @see DirectoryDelegate
* @see FileDelegate
* @see WorkerCore
*/
@SuppressWarnings("serial")
public class LocalWorker extends UnicastRemoteObject implements RemoteSingleRun {
// ----------------------- CONSTANTS -----------------------
/** Handle to the directory containing the security info. */
static final File SECURITY_DIR;
static final String SLASHTEMP;
static {
SLASHTEMP = getProperty("java.io.tmpdir");
File home = new File(getProperty("user.home"));
// If we can't write to $HOME (i.e., we're in an odd deployment) use
// the official version of /tmp/$PID as a fallback.
if (!home.canWrite())
home = new File(SLASHTEMP, getRuntimeMXBean().getName());
SECURITY_DIR = new File(home, SECURITY_DIR_NAME);
}
// ----------------------- VARIABLES -----------------------
/**
* Magic flag used to turn off problematic code when testing inside CI
* environment.
*/
static boolean DO_MKDIR = true;
/** What to use to run a workflow engine. */
private final String executeWorkflowCommand;
/** What workflow to run. */
private final byte[] workflow;
/** The remote access object for the working directory. */
private final DirectoryDelegate baseDir;
/** What inputs to pass as files. */
final Map<String, String> inputFiles;
/** What inputs to pass as files (as file refs). */
final Map<String, File> inputRealFiles;
/** What inputs to pass as direct values. */
final Map<String, String> inputValues;
/** What delimiters to use. */
final Map<String, String> inputDelimiters;
/** The interface to the workflow engine subprocess. */
private final Worker core;
/** Our descriptor token (UUID). */
private final String masterToken;
/**
* The root working directory for a workflow run, or <tt>null</tt> if it has
* been deleted.
*/
private File base;
/**
* When did this workflow start running, or <tt>null</tt> for
* "never/not yet".
*/
private Date start;
/**
* When did this workflow finish running, or <tt>null</tt> for
* "never/not yet".
*/
private Date finish;
/** The cached status of the workflow run. */
RemoteStatus status;
/**
* The name of the input Baclava document, or <tt>null</tt> to not do it
* that way.
*/
String inputBaclava;
/**
* The name of the output Baclava document, or <tt>null</tt> to not do it
* that way.
*/
String outputBaclava;
/**
* The file containing the input Baclava document, or <tt>null</tt> to not
* do it that way.
*/
private File inputBaclavaFile;
/**
* The file containing the output Baclava document, or <tt>null</tt> to not
* do it that way.
*/
private File outputBaclavaFile;
/**
* Registered shutdown hook so that we clean up when this process is killed
* off, or <tt>null</tt> if that is no longer necessary.
*/
Thread shutdownHook;
/** Location for security information to be written to. */
File securityDirectory;
/**
* Password to use to encrypt security information.
*/
char[] keystorePassword = KEYSTORE_PASSWORD;
/** Additional server-specified environment settings. */
Map<String, String> environment = new HashMap<>();
/** Additional server-specified java runtime settings. */
List<String> runtimeSettings = new ArrayList<>();
URL interactionFeedURL;
URL webdavURL;
URL publishURL;//FIXME
private boolean doProvenance = true;
// ----------------------- METHODS -----------------------
/**
* @param executeWorkflowCommand
* The script used to execute workflows.
* @param workflow
* The workflow to execute.
* @param workerClass
* The class to instantiate as our local representative of the
* run.
* @param urReceiver
* The remote class to report the generated usage record(s) to.
* @param id
* The UUID to use, or <tt>null</tt> if we are to invent one.
* @param seedEnvironment
* The key/value pairs to seed the worker subprocess environment
* with.
* @param javaParams
* Parameters to pass to the worker subprocess java runtime
* itself.
* @param workerFactory
* How to make instances of the low-level worker objects.
* @throws RemoteException
* If registration of the worker fails.
* @throws ImplementationException
* If something goes wrong during local setup.
*/
protected LocalWorker(String executeWorkflowCommand, byte[] workflow,
UsageRecordReceiver urReceiver, UUID id,
Map<String, String> seedEnvironment, List<String> javaParams,
WorkerFactory workerFactory) throws RemoteException,
ImplementationException {
super();
if (id == null)
id = randomUUID();
masterToken = id.toString();
this.workflow = workflow;
this.executeWorkflowCommand = executeWorkflowCommand;
String sharedDir = getProperty(SHARED_DIR_PROP, SLASHTEMP);
base = new File(sharedDir, masterToken);
out.println("about to create " + base);
try {
forceMkdir(base);
for (String subdir : SUBDIR_LIST) {
new File(base, subdir).mkdir();
}
} catch (IOException e) {
throw new ImplementationException(
"problem creating run working directory", e);
}
baseDir = new DirectoryDelegate(base, null);
inputFiles = new HashMap<>();
inputRealFiles = new HashMap<>();
inputValues = new HashMap<>();
inputDelimiters = new HashMap<>();
environment.putAll(seedEnvironment);
runtimeSettings.addAll(javaParams);
try {
core = workerFactory.makeInstance();
} catch (Exception e) {
out.println("problem when creating core worker implementation");
e.printStackTrace(out);
throw new ImplementationException(
"problem when creating core worker implementation", e);
}
core.setURReceiver(urReceiver);
Thread t = new Thread(new Runnable() {
/**
* Kill off the worker launched by the core.
*/
@Override
public void run() {
try {
shutdownHook = null;
destroy();
} catch (ImplementationException e) {
// Absolutely nothing we can do here
}
}
});
getRuntime().addShutdownHook(t);
shutdownHook = t;
status = Initialized;
}
@Override
public void destroy() throws ImplementationException {
killWorkflowSubprocess();
removeFromShutdownHooks();
// Is this it?
deleteWorkingDirectory();
deleteSecurityManagerDirectory();
core.deleteLocalResources();
}
private void killWorkflowSubprocess() {
if (status != Finished && status != Initialized)
try {
core.killWorker();
if (finish == null)
finish = new Date();
} catch (Exception e) {
out.println("problem when killing worker");
e.printStackTrace(out);
}
}
private void removeFromShutdownHooks() throws ImplementationException {
try {
if (shutdownHook != null)
getRuntime().removeShutdownHook(shutdownHook);
} catch (RuntimeException e) {
throw new ImplementationException("problem removing shutdownHook",
e);
} finally {
shutdownHook = null;
}
}
private void deleteWorkingDirectory() throws ImplementationException {
try {
if (base != null)
forceDelete(base);
} catch (IOException e) {
out.println("problem deleting working directory");
e.printStackTrace(out);
throw new ImplementationException(
"problem deleting working directory", e);
} finally {
base = null;
}
}
private void deleteSecurityManagerDirectory()
throws ImplementationException {
try {
if (securityDirectory != null)
forceDelete(securityDirectory);
} catch (IOException e) {
out.println("problem deleting security directory");
e.printStackTrace(out);
throw new ImplementationException(
"problem deleting security directory", e);
} finally {
securityDirectory = null;
}
}
@Override
public void addListener(RemoteListener listener) throws RemoteException,
ImplementationException {
throw new ImplementationException("not implemented");
}
@Override
public String getInputBaclavaFile() {
return inputBaclava;
}
@Override
public List<RemoteInput> getInputs() throws RemoteException {
ArrayList<RemoteInput> result = new ArrayList<>();
for (String name : inputFiles.keySet())
result.add(new InputDelegate(name));
return result;
}
@Override
public List<String> getListenerTypes() {
return emptyList();
}
@Override
public List<RemoteListener> getListeners() {
return singletonList(core.getDefaultListener());
}
@Override
public String getOutputBaclavaFile() {
return outputBaclava;
}
class SecurityDelegate extends UnicastRemoteObject implements
RemoteSecurityContext {
private void setPrivatePerms(File dir) {
if (!dir.setReadable(false, false) || !dir.setReadable(true, true)
|| !dir.setExecutable(false, false)
|| !dir.setExecutable(true, true)
|| !dir.setWritable(false, false)
|| !dir.setWritable(true, true)) {
out.println("warning: "
+ "failed to set permissions on security context directory");
}
}
protected SecurityDelegate(String token) throws IOException {
super();
if (DO_MKDIR) {
securityDirectory = new File(SECURITY_DIR, token);
forceMkdir(securityDirectory);
setPrivatePerms(securityDirectory);
}
}
/**
* Write some data to a given file in the context directory.
*
* @param name
* The name of the file to write.
* @param data
* The bytes to put in the file.
* @throws RemoteException
* If anything goes wrong.
* @throws ImplementationException
*/
protected void write(String name, byte[] data) throws RemoteException,
ImplementationException {
try {
File f = new File(securityDirectory, name);
writeByteArrayToFile(f, data);
} catch (IOException e) {
throw new ImplementationException("problem writing " + name, e);
}
}
/**
* Write some data to a given file in the context directory.
*
* @param name
* The name of the file to write.
* @param data
* The lines to put in the file. The
* {@linkplain LocalWorker#SYSTEM_ENCODING system encoding}
* will be used to do the writing.
* @throws RemoteException
* If anything goes wrong.
* @throws ImplementationException
*/
protected void write(String name, Collection<String> data)
throws RemoteException, ImplementationException {
try {
File f = new File(securityDirectory, name);
writeLines(f, SYSTEM_ENCODING, data);
} catch (IOException e) {
throw new ImplementationException("problem writing " + name, e);
}
}
/**
* Write some data to a given file in the context directory.
*
* @param name
* The name of the file to write.
* @param data
* The line to put in the file. The
* {@linkplain LocalWorker#SYSTEM_ENCODING system encoding}
* will be used to do the writing.
* @throws RemoteException
* If anything goes wrong.
* @throws ImplementationException
*/
protected void write(String name, char[] data) throws RemoteException,
ImplementationException {
try {
File f = new File(securityDirectory, name);
writeLines(f, SYSTEM_ENCODING, asList(new String(data)));
} catch (IOException e) {
throw new ImplementationException("problem writing " + name, e);
}
}
@Override
public void setKeystore(byte[] keystore) throws RemoteException,
ImplementationException {
if (status != Initialized)
throw new RemoteException("not initializing");
if (keystore == null)
throw new IllegalArgumentException("keystore may not be null");
write(KEYSTORE_FILE, keystore);
}
@Override
public void setPassword(char[] password) throws RemoteException {
if (status != Initialized)
throw new RemoteException("not initializing");
if (password == null)
throw new IllegalArgumentException("password may not be null");
keystorePassword = password.clone();
}
@Override
public void setTruststore(byte[] truststore) throws RemoteException,
ImplementationException {
if (status != Initialized)
throw new RemoteException("not initializing");
if (truststore == null)
throw new IllegalArgumentException("truststore may not be null");
write(TRUSTSTORE_FILE, truststore);
}
@Override
public void setUriToAliasMap(Map<URI, String> uriToAliasMap)
throws RemoteException {
if (status != Initialized)
throw new RemoteException("not initializing");
if (uriToAliasMap == null)
return;
ArrayList<String> lines = new ArrayList<>();
for (Entry<URI, String> site : uriToAliasMap.entrySet())
lines.add(site.getKey().toASCIIString() + " " + site.getValue());
// write(URI_ALIAS_MAP, lines);
}
@Override
public void setHelioToken(String helioToken) throws RemoteException {
if (status != Initialized)
throw new RemoteException("not initializing");
out.println("registering HELIO CIS token for export");
environment.put(HELIO_TOKEN_NAME, helioToken);
}
}
@Override
public RemoteSecurityContext getSecurityContext() throws RemoteException,
ImplementationException {
try {
return new SecurityDelegate(masterToken);
} catch (RemoteException e) {
if (e.getCause() != null)
throw new ImplementationException(
"problem initializing security context", e.getCause());
throw e;
} catch (IOException e) {
throw new ImplementationException(
"problem initializing security context", e);
}
}
@Override
public RemoteStatus getStatus() {
// only state that can spontaneously change to another
if (status == Operating) {
status = core.getWorkerStatus();
if (status == Finished && finish == null)
finish = new Date();
}
return status;
}
@Override
public RemoteDirectory getWorkingDirectory() {
return baseDir;
}
File validateFilename(String filename) throws RemoteException {
if (filename == null)
throw new IllegalArgumentException("filename must be non-null");
try {
return getValidatedFile(base, filename.split("/"));
} catch (IOException e) {
throw new IllegalArgumentException("failed to validate filename", e);
}
}
class InputDelegate extends UnicastRemoteObject implements RemoteInput {
private String name;
InputDelegate(String name) throws RemoteException {
super();
this.name = name;
if (!inputFiles.containsKey(name)) {
if (status != Initialized)
throw new IllegalStateException("not initializing");
inputFiles.put(name, null);
inputRealFiles.put(name, null);
inputValues.put(name, null);
inputDelimiters.put(name, null);
}
}
@Override
public String getFile() {
return inputFiles.get(name);
}
@Override
public String getName() {
return name;
}
@Override
public String getValue() {
return inputValues.get(name);
}
@Override
public String getDelimiter() throws RemoteException {
return inputDelimiters.get(name);
}
@Override
public void setFile(String file) throws RemoteException {
if (status != Initialized)
throw new IllegalStateException("not initializing");
inputRealFiles.put(name, validateFilename(file));
inputValues.put(name, null);
inputFiles.put(name, file);
inputBaclava = null;
}
@Override
public void setValue(String value) throws RemoteException {
if (status != Initialized)
throw new IllegalStateException("not initializing");
inputValues.put(name, value);
inputFiles.put(name, null);
inputRealFiles.put(name, null);
inputBaclava = null;
}
@Override
public void setDelimiter(String delimiter) throws RemoteException {
if (status != Initialized)
throw new IllegalStateException("not initializing");
if (inputBaclava != null)
throw new IllegalStateException("input baclava file set");
if (delimiter != null) {
if (delimiter.length() > 1)
throw new IllegalStateException(
"multi-character delimiter not permitted");
if (delimiter.charAt(0) == 0)
throw new IllegalStateException(
"may not use NUL for splitting");
if (delimiter.charAt(0) > 127)
throw new IllegalStateException(
"only ASCII characters supported for splitting");
}
inputDelimiters.put(name, delimiter);
}
}
@Override
public RemoteInput makeInput(String name) throws RemoteException {
return new InputDelegate(name);
}
@Override
public RemoteListener makeListener(String type, String configuration)
throws RemoteException {
throw new RemoteException("listener manufacturing unsupported");
}
@Override
public void setInputBaclavaFile(String filename) throws RemoteException {
if (status != Initialized)
throw new IllegalStateException("not initializing");
inputBaclavaFile = validateFilename(filename);
for (String input : inputFiles.keySet()) {
inputFiles.put(input, null);
inputRealFiles.put(input, null);
inputValues.put(input, null);
}
inputBaclava = filename;
}
@Override
public void setOutputBaclavaFile(String filename) throws RemoteException {
if (status != Initialized)
throw new IllegalStateException("not initializing");
if (filename != null)
outputBaclavaFile = validateFilename(filename);
else
outputBaclavaFile = null;
outputBaclava = filename;
}
@Override
public void setGenerateProvenance(boolean prov) {
doProvenance = prov;
}
@Override
public void setStatus(RemoteStatus newStatus)
throws IllegalStateTransitionException, RemoteException,
ImplementationException, StillWorkingOnItException {
if (status == newStatus)
return;
switch (newStatus) {
case Initialized:
throw new IllegalStateTransitionException(
"may not move back to start");
case Operating:
switch (status) {
case Initialized:
boolean started;
try {
started = createWorker();
} catch (Exception e) {
throw new ImplementationException(
"problem creating executing workflow", e);
}
if (!started)
throw new StillWorkingOnItException(
"workflow start in process");
break;
case Stopped:
try {
core.startWorker();
} catch (Exception e) {
throw new ImplementationException(
"problem continuing workflow run", e);
}
break;
case Finished:
throw new IllegalStateTransitionException("already finished");
default:
break;
}
status = Operating;
break;
case Stopped:
switch (status) {
case Initialized:
throw new IllegalStateTransitionException(
"may only stop from Operating");
case Operating:
try {
core.stopWorker();
} catch (Exception e) {
throw new ImplementationException(
"problem stopping workflow run", e);
}
break;
case Finished:
throw new IllegalStateTransitionException("already finished");
default:
break;
}
status = Stopped;
break;
case Finished:
switch (status) {
case Operating:
case Stopped:
try {
core.killWorker();
if (finish == null)
finish = new Date();
} catch (Exception e) {
throw new ImplementationException(
"problem killing workflow run", e);
}
default:
break;
}
status = Finished;
break;
}
}
private boolean createWorker() throws Exception {
start = new Date();
char[] pw = keystorePassword;
keystorePassword = null;
/*
* Do not clear the keystorePassword array here; its ownership is
* *transferred* to the worker core which doesn't copy it but *does*
* clear it after use.
*/
return core.initWorker(this, executeWorkflowCommand, workflow, base,
inputBaclavaFile, inputRealFiles, inputValues, inputDelimiters,
outputBaclavaFile, securityDirectory, pw, doProvenance,
environment, masterToken, runtimeSettings);
}
@Override
public Date getFinishTimestamp() {
return finish == null ? null : new Date(finish.getTime());
}
@Override
public Date getStartTimestamp() {
return start == null ? null : new Date(start.getTime());
}
@Override
public void setInteractionServiceDetails(URL feed, URL webdav, URL publish) {
interactionFeedURL = feed;
webdavURL = webdav;
publishURL = publish;
}
@Override
public void ping() {
// Do nothing here; this *should* be empty
}
}