blob: c7e0de60b34a1bc948f81453926dc31b8973035b [file] [log] [blame]
/*
*/
package org.apache.taverna.server.master.worker;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.taverna.server.master.worker.RunConnection.toDBform;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jdo.annotations.PersistenceAware;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;
import org.apache.taverna.server.master.interfaces.Policy;
import org.apache.taverna.server.master.interfaces.TavernaRun;
import org.apache.taverna.server.master.utils.CallTimeLogger.PerfLogged;
import org.apache.taverna.server.master.utils.JDOSupport;
import org.apache.taverna.server.master.utils.UsernamePrincipal;
/**
* This handles storing runs, interfacing with the underlying state engine as
* necessary.
*
* @author Donal Fellows
*/
@PersistenceAware
public class RunDatabaseDAO extends JDOSupport<RunConnection> {
public RunDatabaseDAO() {
super(RunConnection.class);
}
private Log log = LogFactory.getLog("Taverna.Server.Worker.RunDB");
private RunDatabase facade;
@Required
public void setFacade(RunDatabase facade) {
this.facade = facade;
}
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
@SuppressWarnings("unchecked")
private List<String> names() {
if (log.isDebugEnabled())
log.debug("fetching all run names");
return (List<String>) namedQuery("names").execute();
}
/**
* @return The number of workflow runs in the database.
*/
@WithinSingleTransaction
public int countRuns() {
if (log.isDebugEnabled())
log.debug("counting the number of runs");
return count();
}
private Integer count() {
return (Integer) namedQuery("count").execute();
}
@SuppressWarnings("unchecked")
private List<String> timedout() {
return (List<String>) namedQuery("timedout").execute();
}
@SuppressWarnings("unchecked")
private List<String> unterminated() {
return (List<String>) namedQuery("unterminated").execute();
}
@Nullable
private RunConnection pickRun(@Nonnull String name) {
if (log.isDebugEnabled())
log.debug("fetching the run called " + name);
try {
RunConnection rc = getById(name);
if (rc == null)
log.warn("no result for " + name);
return rc;
} catch (RuntimeException e) {
log.warn("problem in fetch", e);
throw e;
}
}
@Nullable
@WithinSingleTransaction
public String getSecurityToken(@Nonnull String name) {
RunConnection rc = getById(name);
if (rc == null)
return null;
return rc.getSecurityToken();
}
private void persist(@Nonnull RemoteRunDelegate rrd) throws IOException {
persist(toDBform(rrd));
}
@Nonnull
private List<RunConnection> allRuns() {
try {
List<RunConnection> rcs = new ArrayList<>();
List<String> names = names();
for (String id : names) {
try {
if (id != null)
rcs.add(pickRun(id));
} catch (RuntimeException e) {
continue;
}
}
return rcs;
} catch (RuntimeException e) {
log.warn("problem in fetch", e);
throw e;
}
}
// -=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=
/**
* Obtain a workflow run handle.
*
* @param name
* The identifier of the run.
* @return The run handle, or <tt>null</tt> if there is no such run.
*/
@Nullable
@WithinSingleTransaction
public TavernaRun get(String name) {
try {
RunConnection rc = pickRun(name);
return (rc == null) ? null : rc.fromDBform(facade);
} catch (Exception e) {
return null;
}
}
/**
* Get the runs that a user can read things from.
*
* @param user
* Who is asking?
* @param p
* The policy that determines what they can see.
* @return A mapping from run IDs to run handles.
*/
@Nonnull
@WithinSingleTransaction
public Map<String, TavernaRun> listRuns(UsernamePrincipal user, Policy p) {
Map<String, TavernaRun> result = new HashMap<>();
for (String id : names())
try {
RemoteRunDelegate rrd = pickRun(id).fromDBform(facade);
if (p.permitAccess(user, rrd))
result.put(id, rrd);
} catch (Exception e) {
continue;
}
return result;
}
/**
* @return A list of the IDs for all workflow runs.
*/
@Nonnull
@WithinSingleTransaction
public List<String> listRunNames() {
List<String> runNames = new ArrayList<>();
for (RunConnection rc : allRuns())
if (rc.getId() != null)
runNames.add(rc.getId());
return runNames;
}
/**
* @return An arbitrary, representative workflow run.
* @throws Exception
* If anything goes wrong.
*/
@Nullable
@WithinSingleTransaction
public RemoteRunDelegate pickArbitraryRun() throws Exception {
for (RunConnection rc : allRuns()) {
if (rc.getId() == null)
continue;
return rc.fromDBform(facade);
}
return null;
}
/**
* Make a workflow run persistent. Must only be called once per workflow
* run.
*
* @param rrd
* The workflow run to persist.
* @throws IOException
* If anything goes wrong with serialisation of the run.
*/
@WithinSingleTransaction
public void persistRun(@Nonnull RemoteRunDelegate rrd) throws IOException {
persist(rrd);
}
/**
* Stop a workflow run from being persistent.
*
* @param name
* The ID of the run.
* @return Whether a deletion happened.
*/
@WithinSingleTransaction
public boolean unpersistRun(String name) {
RunConnection rc = pickRun(name);
if (rc != null)
delete(rc);
return rc != null;
}
/**
* Ensure that the given workflow run is synchronized with the database.
*
* @param run
* The run to synchronise.
* @throws IOException
* If serialization of anything fails.
*/
@WithinSingleTransaction
public void flushToDisk(@Nonnull RemoteRunDelegate run) throws IOException {
getById(run.id).makeChanges(run);
}
/**
* Remove all workflow runs that have expired.
*
* @return The ids of the deleted runs.
*/
@Nonnull
@PerfLogged
@WithinSingleTransaction
public List<String> doClean() {
if (log.isDebugEnabled())
log.debug("deleting runs that timed out before " + new Date());
List<String> toDelete = timedout();
if (log.isDebugEnabled())
log.debug("found " + toDelete.size() + " runs to delete");
for (String id : toDelete) {
RunConnection rc = getById(id);
try {
rc.fromDBform(facade).run.destroy();
} catch (Exception e) {
if (log.isDebugEnabled())
log.debug("failed to delete execution resource for " + id,
e);
}
delete(rc);
}
return toDelete;
}
/**
* @return A list of workflow runs that are candidates for doing
* notification of termination.
*/
@Nonnull
@PerfLogged
@WithinSingleTransaction
public List<RemoteRunDelegate> getPotentiallyNotifiable() {
List<RemoteRunDelegate> toNotify = new ArrayList<>();
for (String id : unterminated())
try {
RunConnection rc = getById(id);
toNotify.add(rc.fromDBform(facade));
} catch (Exception e) {
log.warn("failed to fetch connection token"
+ "for notification of completion check", e);
}
return toNotify;
}
@PerfLogged
@WithinSingleTransaction
public void markFinished(@Nonnull Set<String> terminated) {
for (String id : terminated) {
RunConnection rc = getById(id);
if (rc == null)
continue;
try {
rc.fromDBform(facade).doneTransitionToFinished = true;
rc.setFinished(true);
} catch (Exception e) {
log.warn("failed to note termination", e);
}
}
}
}