| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hcatalog.templeton.tool; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| |
| /** |
| * The persistent state of a job. The state is stored in one of the |
| * supported storage systems. |
| */ |
| public class JobState { |
| |
| private static final Log LOG = LogFactory.getLog(JobState.class); |
| |
| private String id; |
| |
| // Storage is instantiated in the constructor |
| private TempletonStorage storage = null; |
| |
| private static TempletonStorage.Type type = TempletonStorage.Type.JOB; |
| |
| private Configuration config = null; |
| |
| public JobState(String id, Configuration conf) |
| throws IOException { |
| this.id = id; |
| config = conf; |
| storage = getStorage(conf); |
| } |
| |
| public void delete() |
| throws IOException { |
| try { |
| storage.delete(type, id); |
| } catch (Exception e) { |
| // Error getting children of node -- probably node has been deleted |
| LOG.info("Couldn't delete " + id); |
| } |
| } |
| |
| /** |
| * Get an instance of the selected storage class. Defaults to |
| * HDFS storage if none is specified. |
| */ |
| public static TempletonStorage getStorageInstance(Configuration conf) { |
| TempletonStorage storage = null; |
| try { |
| storage = (TempletonStorage) |
| Class.forName(conf.get(TempletonStorage.STORAGE_CLASS)) |
| .newInstance(); |
| } catch (Exception e) { |
| LOG.warn("No storage method found: " + e.getMessage()); |
| try { |
| storage = new HDFSStorage(); |
| } catch (Exception ex) { |
| LOG.error("Couldn't create storage."); |
| } |
| } |
| return storage; |
| } |
| |
| /** |
| * Get an open instance of the selected storage class. Defaults |
| * to HDFS storage if none is specified. |
| */ |
| public static TempletonStorage getStorage(Configuration conf) throws IOException { |
| TempletonStorage storage = getStorageInstance(conf); |
| storage.openStorage(conf); |
| return storage; |
| } |
| |
| /** |
| * For storage methods that require a connection, this is a hint |
| * that it's time to close the connection. |
| */ |
| public void close() throws IOException { |
| storage.closeStorage(); |
| } |
| |
| // |
| // Properties |
| // |
| |
| /** |
| * This job id. |
| */ |
| public String getId() { |
| return id; |
| } |
| |
| /** |
| * The percent complete of a job |
| */ |
| public String getPercentComplete() |
| throws IOException { |
| return getField("percentComplete"); |
| } |
| |
| public void setPercentComplete(String percent) |
| throws IOException { |
| setField("percentComplete", percent); |
| } |
| |
| /** |
| * The child id of TempletonControllerJob |
| */ |
| public String getChildId() |
| throws IOException { |
| return getField("childid"); |
| } |
| |
| public void setChildId(String childid) |
| throws IOException { |
| setField("childid", childid); |
| } |
| |
| /** |
| * Add a jobid to the list of children of this job. |
| * |
| * @param jobid |
| * @throws IOException |
| */ |
| public void addChild(String jobid) throws IOException { |
| String jobids = ""; |
| try { |
| jobids = getField("children"); |
| } catch (Exception e) { |
| // There are none or they're not readable. |
| } |
| if (!jobids.equals("")) { |
| jobids += ","; |
| } |
| jobids += jobid; |
| setField("children", jobids); |
| } |
| |
| /** |
| * Get a list of jobstates for jobs that are children of this job. |
| * @throws IOException |
| */ |
| public List<JobState> getChildren() throws IOException { |
| ArrayList<JobState> children = new ArrayList<JobState>(); |
| for (String jobid : getField("children").split(",")) { |
| children.add(new JobState(jobid, config)); |
| } |
| return children; |
| } |
| |
| /** |
| * Save a comma-separated list of jobids that are children |
| * of this job. |
| * @param jobids |
| * @throws IOException |
| */ |
| public void setChildren(String jobids) throws IOException { |
| setField("children", jobids); |
| } |
| |
| /** |
| * Set the list of child jobs of this job |
| * @param children |
| */ |
| public void setChildren(List<JobState> children) throws IOException { |
| String val = ""; |
| for (JobState jobstate : children) { |
| if (!val.equals("")) { |
| val += ","; |
| } |
| val += jobstate.getId(); |
| } |
| setField("children", val); |
| } |
| |
| /** |
| * The system exit value of the job. |
| */ |
| public Long getExitValue() |
| throws IOException { |
| return getLongField("exitValue"); |
| } |
| |
| public void setExitValue(long exitValue) |
| throws IOException { |
| setLongField("exitValue", exitValue); |
| } |
| |
| /** |
| * When this job was created. |
| */ |
| public Long getCreated() |
| throws IOException { |
| return getLongField("created"); |
| } |
| |
| public void setCreated(long created) |
| throws IOException { |
| setLongField("created", created); |
| } |
| |
| /** |
| * The user who started this job. |
| */ |
| public String getUser() |
| throws IOException { |
| return getField("user"); |
| } |
| |
| public void setUser(String user) |
| throws IOException { |
| setField("user", user); |
| } |
| |
| /** |
| * The url callback |
| */ |
| public String getCallback() |
| throws IOException { |
| return getField("callback"); |
| } |
| |
| public void setCallback(String callback) |
| throws IOException { |
| setField("callback", callback); |
| } |
| |
| /** |
| * The status of a job once it is completed. |
| */ |
| public String getCompleteStatus() |
| throws IOException { |
| return getField("completed"); |
| } |
| |
| public void setCompleteStatus(String complete) |
| throws IOException { |
| setField("completed", complete); |
| } |
| |
| /** |
| * The time when the callback was sent. |
| */ |
| public Long getNotifiedTime() |
| throws IOException { |
| return getLongField("notified"); |
| } |
| |
| public void setNotifiedTime(long notified) |
| throws IOException { |
| setLongField("notified", notified); |
| } |
| |
| // |
| // Helpers |
| // |
| |
| /** |
| * Fetch an integer field from the store. |
| */ |
| public Long getLongField(String name) |
| throws IOException { |
| String s = storage.getField(type, id, name); |
| if (s == null) |
| return null; |
| else { |
| try { |
| return new Long(s); |
| } catch (NumberFormatException e) { |
| LOG.error("templeton: bug " + name + " " + s + " : " + e); |
| return null; |
| } |
| } |
| } |
| |
| /** |
| * Store a String field from the store. |
| */ |
| public void setField(String name, String val) |
| throws IOException { |
| try { |
| storage.saveField(type, id, name, val); |
| } catch (NotFoundException ne) { |
| throw new IOException(ne.getMessage()); |
| } |
| } |
| |
| public String getField(String name) |
| throws IOException { |
| return storage.getField(type, id, name); |
| } |
| |
| /** |
| * Store a long field. |
| * |
| * @param name |
| * @param val |
| * @throws IOException |
| */ |
| public void setLongField(String name, long val) |
| throws IOException { |
| try { |
| storage.saveField(type, id, name, String.valueOf(val)); |
| } catch (NotFoundException ne) { |
| throw new IOException("Job " + id + " was not found: " + |
| ne.getMessage()); |
| } |
| } |
| |
| /** |
| * Get an id for each currently existing job, which can be used to create |
| * a JobState object. |
| * |
| * @param conf |
| * @throws IOException |
| */ |
| public static List<String> getJobs(Configuration conf) throws IOException { |
| try { |
| return getStorage(conf).getAllForType(type); |
| } catch (Exception e) { |
| throw new IOException("Can't get jobs", e); |
| } |
| } |
| } |