| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.lens.server.scheduler; |
| |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import javax.xml.bind.JAXBElement; |
| |
| import org.apache.lens.api.LensSessionHandle; |
| import org.apache.lens.api.ToXMLString; |
| import org.apache.lens.api.query.QueryHandle; |
| import org.apache.lens.api.scheduler.*; |
| import org.apache.lens.server.api.LensConfConstants; |
| import org.apache.lens.server.api.error.LensException; |
| import org.apache.lens.server.util.UtilityMethods; |
| |
| import org.apache.commons.dbcp.BasicDataSource; |
| import org.apache.commons.dbutils.QueryRunner; |
| import org.apache.commons.dbutils.ResultSetHandler; |
| import org.apache.hadoop.conf.Configuration; |
| |
| import lombok.extern.slf4j.Slf4j; |
| |
| /** |
| * SchedulerDAO class defines scheduler store operations. |
| */ |
| @Slf4j |
| public class SchedulerDAO { |
| private Configuration conf; |
| private SchedulerDBStore store; |
| |
| public SchedulerDAO(Configuration conf) throws LensException { |
| this.conf = conf; |
| try { |
| Class dbStoreClass = Class |
| .forName(conf.get(LensConfConstants.SCHEDULER_STORE_CLASS, SchedulerHsqlDBStore.class.getName())); |
| this.store = (SchedulerDBStore) dbStoreClass.newInstance(); |
| this.store.init(UtilityMethods.getDataSourceFromConfForScheduler(conf)); |
| this.store.createJobTable(); |
| this.store.createJobInstanceTable(); |
| this.store.createJobInstanceRunTable(); |
| } catch (SQLException e) { |
| log.error("Error creating job tables", e); |
| throw new LensException("Error creating job tables ", e); |
| } catch (ClassNotFoundException e) { |
| log.error("No class found ", e); |
| throw new LensException("No class found ", e); |
| } catch (InstantiationException | IllegalAccessException e) { |
| log.error("Illegal access exception", e); |
| throw new LensException("Illegal access exception ", e); |
| } |
| } |
| |
| /** |
| * Saves the SchedulerJobInfo object into the store. |
| * |
| * @param jobInfo object |
| * @return the number of records stored |
| */ |
| public int storeJob(SchedulerJobInfo jobInfo) { |
| try { |
| return store.insertIntoJobTable(jobInfo); |
| } catch (SQLException e) { |
| log.error("Error while storing the jobInfo for " + jobInfo.getId().getHandleIdString(), e); |
| return 0; |
| } |
| } |
| |
| /** |
| * Fetches the SchedulerJobInfo object corresponding to handle id. |
| * |
| * @param id: Job handle id. |
| * @return SchedulerJobInfo |
| */ |
| public SchedulerJobInfo getSchedulerJobInfo(SchedulerJobHandle id) { |
| try { |
| return store.getSchedulerJobInfo(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting the job detail for " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Gets the job definition from the store |
| * |
| * @param id : Job handle id. |
| * @return XJob definition |
| */ |
| public XJob getJob(SchedulerJobHandle id) { |
| try { |
| return store.getJob(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting the job for " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Get the user of the job who submitted the job. |
| * |
| * @param id |
| * @return |
| */ |
| public String getUser(SchedulerJobHandle id) { |
| try { |
| return store.getUser(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting the user for the job with handle " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Gets the Job status |
| * |
| * @param id : Job handle id. |
| * @return SchedulerJobStatus of the job. |
| */ |
| public SchedulerJobState getJobState(SchedulerJobHandle id) { |
| try { |
| return store.getJobState(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting the job status for " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Updates the job definition from the new SchedulerJobInfo |
| * |
| * @param info: Updated info object. |
| * @return number of rows updated. |
| */ |
| public int updateJob(SchedulerJobInfo info) { |
| try { |
| return store.updateJob(info.getId().getHandleIdString(), info.getJob(), info.getModifiedOn()); |
| } catch (SQLException e) { |
| log.error("Error while updating job for " + info.getId().getHandleIdString(), e); |
| return 0; |
| } |
| } |
| |
| /** |
| * Updates the job status form the new SchedulerJobInfo |
| * |
| * @param info: Updated info objects |
| * @return number of rows updated. |
| */ |
| public int updateJobStatus(SchedulerJobInfo info) { |
| try { |
| return store.updateJobStatus(info.getId().getHandleIdString(), info.getJobState().name(), info.getModifiedOn()); |
| } catch (SQLException e) { |
| log.error("Error while updating job status for " + info.getId().getHandleIdString(), e); |
| return 0; |
| } |
| } |
| |
| public int storeJobInstance(SchedulerJobInstanceInfo instanceInfo) { |
| try { |
| return store.insertIntoJobInstanceTable(instanceInfo); |
| } catch (SQLException e) { |
| log.error("Error while storing job instance for " + instanceInfo.getId()); |
| return 0; |
| } |
| } |
| |
| public int storeJobInstanceRun(SchedulerJobInstanceRun instanceRun) { |
| try { |
| return store.insertIntoJobInstanceRunTable(instanceRun); |
| } catch (SQLException e) { |
| log.error( |
| "Error while storing job instance run for " + instanceRun.getRunId() + " and instance handle " + instanceRun |
| .getHandle().getHandleIdString(), e); |
| return 0; |
| } |
| } |
| |
| /** |
| * Gets the SchedulerJobInstanceInfo corresponding instance handle id. |
| * |
| * @param id : Job instance id |
| * @return ShedulerJobInstanceInfo |
| */ |
| public SchedulerJobInstanceInfo getSchedulerJobInstanceInfo(SchedulerJobInstanceHandle id) { |
| try { |
| return store.getJobInstanceInfo(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting the job instance info for " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Updates the instance status |
| * |
| * @param instanceRun : instance Run object |
| * @return number of rows updated. |
| */ |
| public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) { |
| try { |
| return store.updateJobInstanceRun(instanceRun); |
| } catch (SQLException e) { |
| log.error("Error while updating the job instance status for " + instanceRun.getHandle().getHandleIdString() |
| + " and run: " + instanceRun.getRunId(), e); |
| return 0; |
| } |
| } |
| |
| /** |
| * Gets all the instance handle id for a job. |
| * |
| * @param id: Job handle id. |
| * @return List of instance handles. |
| */ |
| public List<SchedulerJobInstanceInfo> getJobInstances(SchedulerJobHandle id) { |
| // TODO: Add number of results to be fetched |
| try { |
| return store.getAllJobInstances(id.getHandleIdString()); |
| } catch (SQLException e) { |
| log.error("Error while getting instances of a job with id " + id.getHandleIdString(), e); |
| return null; |
| } |
| } |
| |
| /** |
| * Gets all jobs which match the filter requirements. |
| * |
| * @param username : User name of the job |
| * @param jobState : state of the job |
| * @param startTime : Created on should be greater than this start time. |
| * @param endTime : Created on should be less than the end time. |
| * @return List of Job handles |
| */ |
| public List<SchedulerJobHandle> getJobs(String username, SchedulerJobState jobState, Long startTime, Long endTime) { |
| try { |
| return store.getJobs(username, jobState == null ? null : jobState.name(), startTime, endTime); |
| } catch (SQLException e) { |
| log.error("Error while getting jobs ", e); |
| return null; |
| } |
| } |
| |
| /** |
| * Get the handle given the job name. |
| * |
| * @param jobName |
| * @return |
| */ |
| public List<SchedulerJobHandle> getJob(String jobName) { |
| try { |
| return store.getJobsByName(jobName); |
| } catch (SQLException e) { |
| log.error("Error while getting jobs ", e); |
| return null; |
| } |
| } |
| |
| public abstract static class SchedulerDBStore { |
| protected static final String JOB_TABLE = "job_table"; |
| protected static final String JOB_INSTANCE_TABLE = "job_instance_table"; |
| protected static final String JOB_INSTANCE_RUN_TABLE = "job_instance_run_table"; |
| protected static final String COLUMN_ID = "id"; |
| protected static final String COLUMN_RUN_ID = "runid"; |
| protected static final String COLUMN_JOB = "job"; |
| protected static final String COLUMN_USER = "username"; |
| protected static final String COLUMN_STATUS = "status"; |
| protected static final String COLUMN_CREATED_ON = "createdon"; |
| protected static final String COLUMN_SCHEDULE_TIME = "scheduledtime"; |
| protected static final String COLUMN_MODIFIED_ON = "modifiedon"; |
| protected static final String COLUMN_JOB_ID = "jobid"; |
| protected static final String COLUMN_SESSION_HANDLE = "sessionhandle"; |
| protected static final String COLUMN_START_TIME = "starttime"; |
| protected static final String COLUMN_END_TIME = "endtime"; |
| protected static final String COLUMN_RESULT_PATH = "resultpath"; |
| protected static final String COLUMN_QUERY_HANDLE = "queryhandle"; |
| protected static final String COLUMN_JOB_NAME = "jobname"; |
| protected QueryRunner runner; |
| protected ObjectFactory jobFactory = new ObjectFactory(); |
| // Generic multiple row handler for the fetch query. |
| private ResultSetHandler<List<Object[]>> multipleRowsHandler = new ResultSetHandler<List<Object[]>>() { |
| @Override |
| public List<Object[]> handle(ResultSet resultSet) throws SQLException { |
| List<Object[]> output = new ArrayList<>(); |
| while (resultSet.next()) { |
| ResultSetMetaData meta = resultSet.getMetaData(); |
| int cols = meta.getColumnCount(); |
| Object[] result = new Object[cols]; |
| for (int i = 0; i < cols; i++) { |
| result[i] = resultSet.getObject(i + 1); |
| } |
| output.add(result); |
| } |
| return output; |
| } |
| }; |
| |
| /** |
| * Init the store. |
| * |
| * @param ds |
| */ |
| public void init(BasicDataSource ds) { |
| runner = new QueryRunner(ds); |
| } |
| |
| /** |
| * Creates the job table |
| * |
| * @throws SQLException |
| */ |
| public abstract void createJobTable() throws SQLException; |
| |
| /** |
| * Creates the job instance table |
| * |
| * @throws SQLException |
| */ |
| public abstract void createJobInstanceTable() throws SQLException; |
| |
| /** |
| * Creates the job instance run table |
| * |
| * @throws SQLException |
| */ |
| public abstract void createJobInstanceRunTable() throws SQLException; |
| |
| /** |
| * Inserts the Job info object into job table |
| * |
| * @param jobInfo |
| * @return number of rows inserted. |
| * @throws SQLException |
| */ |
| public int insertIntoJobTable(SchedulerJobInfo jobInfo) throws SQLException { |
| String insertSQL = "INSERT INTO " + JOB_TABLE + " VALUES(?,?,?,?,?,?,?)"; |
| JAXBElement<XJob> xmlJob = jobFactory.createJob(jobInfo.getJob()); |
| return runner.update(insertSQL, jobInfo.getId().toString(), ToXMLString.toString(xmlJob), jobInfo.getUserName(), |
| jobInfo.getJobState().name(), jobInfo.getCreatedOn(), jobInfo.getModifiedOn(), jobInfo.getJob().getName()); |
| } |
| |
| /** |
| * Inserts the job instance info object into job instance table |
| * |
| * @param instanceInfo |
| * @return number of rows inserted. |
| * @throws SQLException |
| */ |
| public int insertIntoJobInstanceTable(SchedulerJobInstanceInfo instanceInfo) throws SQLException { |
| String insertSQL = "INSERT INTO " + JOB_INSTANCE_TABLE + " VALUES(?,?,?)"; |
| return runner |
| .update(insertSQL, instanceInfo.getId().getHandleIdString(), instanceInfo.getJobId().getHandleIdString(), |
| instanceInfo.getScheduleTime()); |
| } |
| |
| public int insertIntoJobInstanceRunTable(SchedulerJobInstanceRun instanceRun) throws SQLException { |
| String insetSQL = "INSERT INTO " + JOB_INSTANCE_RUN_TABLE + " VALUES(?,?,?,?,?,?,?,?)"; |
| return runner.update(insetSQL, instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId(), |
| instanceRun.getSessionHandle() == null ? "" : instanceRun.getSessionHandle().toString(), |
| instanceRun.getStartTime(), instanceRun.getEndTime(), instanceRun.getResultPath(), |
| instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), |
| instanceRun.getInstanceState().name()); |
| } |
| |
| /** |
| * Gets the Job info object |
| * |
| * @param idStr |
| * @return SchedulerJobInfo object corresponding to the job handle. |
| * @throws SQLException |
| */ |
| public SchedulerJobInfo getSchedulerJobInfo(String idStr) throws SQLException { |
| String fetchSQL = "SELECT * FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, idStr); |
| if (result.size() == 0) { |
| return null; |
| } else { |
| Object[] jobInfo = result.get(0); |
| SchedulerJobHandle id = SchedulerJobHandle.fromString((String) jobInfo[0]); |
| XJob xJob = ToXMLString.valueOf((String) jobInfo[1], ObjectFactory.class); |
| String userName = (String) jobInfo[2]; |
| String status = (String) jobInfo[3]; |
| long createdOn = (Long) jobInfo[4]; |
| long modifiedOn = (Long) jobInfo[5]; |
| SchedulerJobState jobState = SchedulerJobState.valueOf(status); |
| return new SchedulerJobInfo(id, xJob, userName, jobState, createdOn, modifiedOn); |
| } |
| } |
| |
| /** |
| * Gets the job definition |
| * |
| * @param id |
| * @return Xjob corresponding to the job handle |
| * @throws SQLException |
| */ |
| public XJob getJob(String id) throws SQLException { |
| String fetchSQL = "SELECT " + COLUMN_JOB + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); |
| if (result.size() == 0) { |
| return null; |
| } else { |
| return ToXMLString.valueOf((String) result.get(0)[0], ObjectFactory.class); |
| } |
| } |
| |
| /** |
| * Get the job handles given the job name |
| * |
| * @param jobname |
| * @return A list of handles |
| * @throws SQLException |
| */ |
| public List<SchedulerJobHandle> getJobsByName(String jobname) throws SQLException { |
| String fetchSQL = "SELCET " + COLUMN_ID + " FROM " + JOB_TABLE + " WHERE " + COLUMN_JOB_NAME + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, jobname); |
| List<SchedulerJobHandle> resOut = new ArrayList<>(); |
| for (int i = 0; i < result.size(); i++) { |
| Object[] row = result.get(i); |
| resOut.add(SchedulerJobHandle.fromString((String) row[0])); |
| } |
| return resOut; |
| } |
| |
| /** |
| * Gets the job status |
| * |
| * @param id |
| * @return SchedulerJobStatus |
| * @throws SQLException |
| */ |
| public SchedulerJobState getJobState(String id) throws SQLException { |
| String fetchSQL = "SELECT " + COLUMN_STATUS + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); |
| if (result.size() == 0) { |
| return null; |
| } else { |
| return SchedulerJobState.valueOf((String) result.get(0)[0]); |
| } |
| } |
| |
| /** |
| * Gets all the jobs which match the filter requirements. |
| * |
| * @param username |
| * @param status |
| * @param starttime |
| * @param endtime |
| * @return the list of job handles. |
| * @throws SQLException |
| */ |
| public List<SchedulerJobHandle> getJobs(String username, String status, Long starttime, Long endtime) |
| throws SQLException { |
| String whereClause = ""; |
| if (username != null && !username.isEmpty()) { |
| whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_USER + " = '" + username + "'"; |
| } |
| if (status != null && !status.isEmpty()) { |
| whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_STATUS + " = '" + status + "'"; |
| } |
| if (starttime != null && starttime > 0) { |
| whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_CREATED_ON + " >= " + starttime; |
| } |
| if (endtime != null && endtime > 0) { |
| whereClause += ((whereClause.isEmpty()) ? " WHERE " : " AND ") + COLUMN_CREATED_ON + " < " + endtime; |
| } |
| String fetchSQL = "SELECT " + COLUMN_ID + " FROM " + JOB_TABLE + whereClause; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler); |
| List<SchedulerJobHandle> resOut = new ArrayList<>(); |
| for (int i = 0; i < result.size(); i++) { |
| Object[] row = result.get(i); |
| resOut.add(SchedulerJobHandle.fromString((String) row[0])); |
| } |
| return resOut; |
| } |
| |
| /** |
| * Update the XJob into the job table |
| * |
| * @param id |
| * @param job |
| * @param modifiedOn |
| * @return the number of rows updated |
| * @throws SQLException |
| */ |
| public int updateJob(String id, XJob job, long modifiedOn) throws SQLException { |
| String updateSQL = |
| "UPDATE " + JOB_TABLE + " SET " + COLUMN_JOB + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID |
| + "=?"; |
| JAXBElement<XJob> xmlJob = jobFactory.createJob(job); |
| return runner.update(updateSQL, ToXMLString.toString(xmlJob), modifiedOn, id); |
| } |
| |
| /** |
| * Updates the job status into the job table |
| * |
| * @param id |
| * @param status |
| * @param modifiedOn |
| * @return number of rows updated. |
| * @throws SQLException |
| */ |
| public int updateJobStatus(String id, String status, long modifiedOn) throws SQLException { |
| String updateSQL = |
| "UPDATE " + JOB_TABLE + " SET " + COLUMN_STATUS + "=?, " + COLUMN_MODIFIED_ON + "=? " + " WHERE " + COLUMN_ID |
| + "=?"; |
| return runner.update(updateSQL, status, modifiedOn, id); |
| } |
| |
| /** |
| * Gets the Job instance info corresponding to handle id. |
| * |
| * @param idStr |
| * @return SchedulerJobInstanceInfo |
| * @throws SQLException |
| */ |
| public SchedulerJobInstanceInfo getJobInstanceInfo(String idStr) throws SQLException { |
| String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, idStr); |
| if (result.size() == 0) { |
| return null; |
| } else { |
| Object[] instanceInfo = result.get(0); |
| return parseSchedulerInstance(instanceInfo); |
| } |
| } |
| |
| private SchedulerJobInstanceInfo parseSchedulerInstance(Object[] instanceInfo) throws SQLException { |
| SchedulerJobInstanceHandle id = SchedulerJobInstanceHandle.fromString((String) instanceInfo[0]); |
| SchedulerJobHandle jobId = SchedulerJobHandle.fromString((String) instanceInfo[1]); |
| long createdOn = (Long) instanceInfo[2]; |
| // Get the Runs |
| String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_RUN_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> instanceRuns = runner.query(fetchSQL, multipleRowsHandler, (String) instanceInfo[0]); |
| List<SchedulerJobInstanceRun> runList = new ArrayList<>(); |
| for (Object[] run : instanceRuns) { |
| // run[0] will contain the instanceID |
| int runId = (Integer) run[1]; |
| LensSessionHandle sessionHandle = LensSessionHandle.valueOf((String) run[2]); |
| long starttime = (Long) run[3]; |
| long endtime = (Long) run[4]; |
| String resultPath = (String) run[5]; |
| String queryHandleString = (String) run[6]; |
| QueryHandle queryHandle = null; |
| if (!queryHandleString.isEmpty()) { |
| queryHandle = QueryHandle.fromString((String) run[6]); |
| } |
| SchedulerJobInstanceState instanceStatus = SchedulerJobInstanceState.valueOf((String) run[7]); |
| SchedulerJobInstanceRun instanceRun = new SchedulerJobInstanceRun(id, runId, sessionHandle, starttime, endtime, |
| resultPath, queryHandle, instanceStatus); |
| runList.add(instanceRun); |
| } |
| return new SchedulerJobInstanceInfo(id, jobId, createdOn, runList); |
| } |
| |
| /** |
| * Updates the status of a job instance. |
| * |
| * @param instanceRun |
| * @return number of rows updated. |
| * @throws SQLException |
| */ |
| public int updateJobInstanceRun(SchedulerJobInstanceRun instanceRun) throws SQLException { |
| String updateSQL = |
| "UPDATE " + JOB_INSTANCE_RUN_TABLE + " SET " + COLUMN_END_TIME + "=?, " + COLUMN_RESULT_PATH + "=?, " |
| + COLUMN_QUERY_HANDLE + "=?, " + COLUMN_STATUS + "=?" + " WHERE " + COLUMN_ID + "=? AND " + COLUMN_RUN_ID |
| + "=?"; |
| |
| return runner.update(updateSQL, instanceRun.getEndTime(), instanceRun.getResultPath(), |
| instanceRun.getQueryHandle() == null ? "" : instanceRun.getQueryHandle().getHandleIdString(), |
| instanceRun.getInstanceState().name(), instanceRun.getHandle().getHandleIdString(), instanceRun.getRunId()); |
| } |
| |
| /** |
| * Gets all the instance handle of a job |
| * |
| * @param jobId |
| * @return List of SchedulerJobInstanceHandle |
| * @throws SQLException |
| */ |
| public List<SchedulerJobInstanceInfo> getAllJobInstances(String jobId) throws SQLException { |
| String fetchSQL = "SELECT * FROM " + JOB_INSTANCE_TABLE + " WHERE " + COLUMN_JOB_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, jobId); |
| List<SchedulerJobInstanceInfo> resOut = new ArrayList<>(); |
| for (int i = 0; i < result.size(); i++) { |
| Object[] row = result.get(i); |
| resOut.add(parseSchedulerInstance(row)); |
| } |
| return resOut; |
| } |
| |
| /** |
| * Get the user who submitted the job. |
| * |
| * @param id |
| * @return |
| * @throws SQLException |
| */ |
| public String getUser(String id) throws SQLException { |
| String fetchSQL = "SELECT " + COLUMN_USER + " FROM " + JOB_TABLE + " WHERE " + COLUMN_ID + "=?"; |
| List<Object[]> result = runner.query(fetchSQL, multipleRowsHandler, id); |
| if (result.size() == 0) { |
| return null; |
| } else { |
| return (String) result.get(0)[0]; |
| } |
| } |
| } |
| |
| /** |
| * MySQL based DB Store. |
| */ |
| public static class SchedulerMySQLDBStore extends SchedulerDBStore { |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB |
| + " TEXT," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON |
| + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " |
| + COLUMN_ID + ")" + ")"; |
| runner.update(createSQL); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobInstanceTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " |
| + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " |
| + COLUMN_ID + ")" + ")"; |
| runner.update(createSQL); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobInstanceRunTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " |
| + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME |
| + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " TEXT, " + COLUMN_QUERY_HANDLE |
| + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " + COLUMN_ID + ", " + COLUMN_RUN_ID |
| + ")" + ")"; |
| runner.update(createSQL); |
| } |
| } |
| |
| /** |
| * HSQL Based DB store. This class is used in testing. |
| */ |
| public static class SchedulerHsqlDBStore extends SchedulerDBStore { |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL," + COLUMN_JOB |
| + " VARCHAR(1024)," + COLUMN_USER + " VARCHAR(255)," + COLUMN_STATUS + " VARCHAR(20)," + COLUMN_CREATED_ON |
| + " BIGINT, " + COLUMN_MODIFIED_ON + " BIGINT, " + COLUMN_JOB_NAME + " VARCHAR(255), " + " PRIMARY KEY ( " |
| + COLUMN_ID + ")" + ")"; |
| runner.update(createSQL); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobInstanceTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " |
| + COLUMN_JOB_ID + " VARCHAR(255) NOT NULL, " + COLUMN_SCHEDULE_TIME + " BIGINT, " + " PRIMARY KEY ( " |
| + COLUMN_ID + ")" + ")"; |
| runner.update(createSQL); |
| } |
| |
| /** |
| * {@inheritDoc} |
| */ |
| @Override |
| public void createJobInstanceRunTable() throws SQLException { |
| String createSQL = |
| "CREATE TABLE IF NOT EXISTS " + JOB_INSTANCE_RUN_TABLE + "( " + COLUMN_ID + " VARCHAR(255) NOT NULL, " |
| + COLUMN_RUN_ID + " INT NOT NULL, " + COLUMN_SESSION_HANDLE + " VARCHAR(255), " + COLUMN_START_TIME |
| + " BIGINT, " + COLUMN_END_TIME + " BIGINT, " + COLUMN_RESULT_PATH + " VARCHAR(1024)," |
| + COLUMN_QUERY_HANDLE + " VARCHAR(255), " + COLUMN_STATUS + " VARCHAR(20), " + " PRIMARY KEY ( " |
| + COLUMN_ID + ", " + COLUMN_RUN_ID + " )" + ")"; |
| runner.update(createSQL); |
| } |
| } |
| } |