blob: 664395106483566e9ff1188a6162874920695546 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.resource.system.extern;
//JDK imports
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
//OODT imports
import org.apache.oodt.cas.resource.structs.Job;
import org.apache.oodt.cas.resource.structs.JobInput;
import org.apache.oodt.cas.resource.structs.JobInstance;
import org.apache.oodt.cas.resource.structs.exceptions.JobException;
import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
import org.apache.oodt.cas.resource.util.XmlRpcStructFactory;
//APACHE imports
import org.apache.xmlrpc.WebServer;
/**
* @author woollard
* @version $Revision$
*
* <p>
* An XML RPC-based Batch Submission System.
* </p>
*
*/
public class XmlRpcBatchStub {
/* the port to run the XML RPC web server on, default is 2000 */
private int webServerPort = 2000;
/* our xml rpc web server */
private WebServer webServer = null;
/* our log stream */
private static Logger LOG = Logger.getLogger(XmlRpcBatchStub.class
.getName());
private static Map jobThreadMap = null;
public XmlRpcBatchStub(int port) throws Exception {
webServerPort = port;
// start up the web server
webServer = new WebServer(webServerPort);
webServer.addHandler("batchstub", this);
webServer.start();
jobThreadMap = new HashMap();
LOG.log(Level.INFO, "XmlRpc Batch Stub started by "
+ System.getProperty("user.name", "unknown"));
}
public boolean isAlive() {
return true;
}
public boolean executeJob(Hashtable jobHash, Hashtable jobInput)
throws JobException {
return genericExecuteJob(jobHash, jobInput);
}
public boolean executeJob(Hashtable jobHash, Date jobInput)
throws JobException {
return genericExecuteJob(jobHash, jobInput);
}
public boolean executeJob(Hashtable jobHash, double jobInput)
throws JobException {
return genericExecuteJob(jobHash, new Double(jobInput));
}
public boolean executeJob(Hashtable jobHash, int jobInput)
throws JobException {
return genericExecuteJob(jobHash, new Integer(jobInput));
}
public boolean executeJob(Hashtable jobHash, boolean jobInput)
throws JobException {
return genericExecuteJob(jobHash, new Boolean(jobInput));
}
public boolean executeJob(Hashtable jobHash, Vector jobInput)
throws JobException {
return genericExecuteJob(jobHash, jobInput);
}
public boolean executeJob(Hashtable jobHash, byte[] jobInput)
throws JobException {
return genericExecuteJob(jobHash, jobInput);
}
public synchronized boolean killJob(Hashtable jobHash) {
Job job = XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
Thread jobThread = (Thread) jobThreadMap.get(job.getId());
if (jobThread == null) {
LOG.log(Level.WARNING, "Job: [" + job.getId()
+ "] not managed by this batch stub");
return false;
}
// okay, so interrupt it, which should cause it to stop
jobThread.interrupt();
return true;
}
private boolean genericExecuteJob(Hashtable jobHash, Object jobInput)
throws JobException {
JobInstance exec = null;
JobInput in = null;
try {
Job job = XmlRpcStructFactory.getJobFromXmlRpc(jobHash);
LOG.log(Level.INFO, "stub attempting to execute class: ["
+ job.getJobInstanceClassName() + "]");
exec = GenericResourceManagerObjectFactory
.getJobInstanceFromClassName(job.getJobInstanceClassName());
in = GenericResourceManagerObjectFactory
.getJobInputFromClassName(job.getJobInputClassName());
// load the input obj
in.read(jobInput);
// create threaded job
// so that it can be interrupted
RunnableJob runner = new RunnableJob(exec, in);
Thread threadRunner = new Thread(runner);
/* save this job thread in a map so we can kill it later */
jobThreadMap.put(job.getId(), threadRunner);
threadRunner.start();
try {
threadRunner.join();
} catch (InterruptedException e) {
LOG.log(Level.INFO, "Current job: [" + job.getName()
+ "]: killed: exiting gracefully");
synchronized (jobThreadMap) {
Thread endThread = (Thread) jobThreadMap.get(job.getId());
if (endThread != null)
endThread = null;
}
return false;
}
synchronized (jobThreadMap) {
Thread endThread = (Thread) jobThreadMap.get(job.getId());
if (endThread != null)
endThread = null;
}
return runner.wasSuccessful();
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public static void main(String[] args) throws Exception {
int portNum = -1;
String usage = "XmlRpcBatchStub --portNum <port number for xml rpc service>\n";
for (int i = 0; i < args.length; i++) {
if (args[i].equals("--portNum")) {
portNum = Integer.parseInt(args[++i]);
}
}
if (portNum == -1) {
System.err.println(usage);
System.exit(1);
}
XmlRpcBatchStub stub = new XmlRpcBatchStub(portNum);
for (;;)
try {
Thread.currentThread().join();
} catch (InterruptedException ignore) {
}
}
private class RunnableJob implements Runnable {
private JobInput in;
private JobInstance job;
private boolean successful;
public RunnableJob(JobInstance job, JobInput in) {
this.job = job;
this.in = in;
this.successful = false;
}
/*
* (non-Javadoc)
*
* @see java.lang.Runnable#run()
*/
public void run() {
try {
this.successful = job.execute(in);
} catch (JobInputException e) {
e.printStackTrace();
this.successful = false;
}
}
public boolean wasSuccessful() {
return this.successful;
}
}
}