blob: 0b884f77d96c880e24ce4dc2b7ae7c51bd81d5a4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.templeton.tool;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
/**
* The persistent state of a job. The state is stored in one of the
* supported storage systems.
*/
public class JobState {
private static final Log LOG = LogFactory.getLog(JobState.class);
private String id;
// Storage is instantiated in the constructor
private TempletonStorage storage = null;
private static TempletonStorage.Type type = TempletonStorage.Type.JOB;
private Configuration config = null;
public JobState(String id, Configuration conf)
throws IOException {
this.id = id;
config = conf;
storage = getStorage(conf);
}
public void delete()
throws IOException {
try {
storage.delete(type, id);
} catch (Exception e) {
// Error getting children of node -- probably node has been deleted
LOG.info("Couldn't delete " + id);
}
}
/**
* Get an instance of the selected storage class. Defaults to
* HDFS storage if none is specified.
*/
public static TempletonStorage getStorageInstance(Configuration conf) {
TempletonStorage storage = null;
try {
storage = (TempletonStorage)
Class.forName(conf.get(TempletonStorage.STORAGE_CLASS))
.newInstance();
} catch (Exception e) {
LOG.warn("No storage method found: " + e.getMessage());
try {
storage = new HDFSStorage();
} catch (Exception ex) {
LOG.error("Couldn't create storage.");
}
}
return storage;
}
/**
* Get an open instance of the selected storage class. Defaults
* to HDFS storage if none is specified.
*/
public static TempletonStorage getStorage(Configuration conf) throws IOException {
TempletonStorage storage = getStorageInstance(conf);
storage.openStorage(conf);
return storage;
}
/**
* For storage methods that require a connection, this is a hint
* that it's time to close the connection.
*/
public void close() throws IOException {
storage.closeStorage();
}
//
// Properties
//
/**
* This job id.
*/
public String getId() {
return id;
}
/**
* The percent complete of a job
*/
public String getPercentComplete()
throws IOException {
return getField("percentComplete");
}
public void setPercentComplete(String percent)
throws IOException {
setField("percentComplete", percent);
}
/**
* The child id of TempletonControllerJob
*/
public String getChildId()
throws IOException {
return getField("childid");
}
public void setChildId(String childid)
throws IOException {
setField("childid", childid);
}
/**
* Add a jobid to the list of children of this job.
*
* @param jobid
* @throws IOException
*/
public void addChild(String jobid) throws IOException {
String jobids = "";
try {
jobids = getField("children");
} catch (Exception e) {
// There are none or they're not readable.
}
if (!jobids.equals("")) {
jobids += ",";
}
jobids += jobid;
setField("children", jobids);
}
/**
* Get a list of jobstates for jobs that are children of this job.
* @throws IOException
*/
public List<JobState> getChildren() throws IOException {
ArrayList<JobState> children = new ArrayList<JobState>();
for (String jobid : getField("children").split(",")) {
children.add(new JobState(jobid, config));
}
return children;
}
/**
* Save a comma-separated list of jobids that are children
* of this job.
* @param jobids
* @throws IOException
*/
public void setChildren(String jobids) throws IOException {
setField("children", jobids);
}
/**
* Set the list of child jobs of this job
* @param children
*/
public void setChildren(List<JobState> children) throws IOException {
String val = "";
for (JobState jobstate : children) {
if (!val.equals("")) {
val += ",";
}
val += jobstate.getId();
}
setField("children", val);
}
/**
* The system exit value of the job.
*/
public Long getExitValue()
throws IOException {
return getLongField("exitValue");
}
public void setExitValue(long exitValue)
throws IOException {
setLongField("exitValue", exitValue);
}
/**
* When this job was created.
*/
public Long getCreated()
throws IOException {
return getLongField("created");
}
public void setCreated(long created)
throws IOException {
setLongField("created", created);
}
/**
* The user who started this job.
*/
public String getUser()
throws IOException {
return getField("user");
}
public void setUser(String user)
throws IOException {
setField("user", user);
}
/**
* The url callback
*/
public String getCallback()
throws IOException {
return getField("callback");
}
public void setCallback(String callback)
throws IOException {
setField("callback", callback);
}
/**
* The status of a job once it is completed.
*/
public String getCompleteStatus()
throws IOException {
return getField("completed");
}
public void setCompleteStatus(String complete)
throws IOException {
setField("completed", complete);
}
/**
* The time when the callback was sent.
*/
public Long getNotifiedTime()
throws IOException {
return getLongField("notified");
}
public void setNotifiedTime(long notified)
throws IOException {
setLongField("notified", notified);
}
//
// Helpers
//
/**
* Fetch an integer field from the store.
*/
public Long getLongField(String name)
throws IOException {
String s = storage.getField(type, id, name);
if (s == null)
return null;
else {
try {
return new Long(s);
} catch (NumberFormatException e) {
LOG.error("templeton: bug " + name + " " + s + " : " + e);
return null;
}
}
}
/**
* Store a String field from the store.
*/
public void setField(String name, String val)
throws IOException {
try {
storage.saveField(type, id, name, val);
} catch (NotFoundException ne) {
throw new IOException(ne.getMessage());
}
}
public String getField(String name)
throws IOException {
return storage.getField(type, id, name);
}
/**
* Store a long field.
*
* @param name
* @param val
* @throws IOException
*/
public void setLongField(String name, long val)
throws IOException {
try {
storage.saveField(type, id, name, String.valueOf(val));
} catch (NotFoundException ne) {
throw new IOException("Job " + id + " was not found: " +
ne.getMessage());
}
}
/**
* Get an id for each currently existing job, which can be used to create
* a JobState object.
*
* @param conf
* @throws IOException
*/
public static List<String> getJobs(Configuration conf) throws IOException {
try {
return getStorage(conf).getAllForType(type);
} catch (Exception e) {
throw new IOException("Can't get jobs", e);
}
}
}