blob: c7daf9114cefe6012b01c49a77b86235df7cd2d1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
/**
* Query Executor that provides API to run query for Coordinator Job
*/
public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, CoordJobQueryExecutor.CoordJobQuery> {
public enum CoordJobQuery {
UPDATE_COORD_JOB,
UPDATE_COORD_JOB_STATUS,
UPDATE_COORD_JOB_BUNDLEID,
UPDATE_COORD_JOB_APPNAMESPACE,
UPDATE_COORD_JOB_STATUS_PENDING,
UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME,
UPDATE_COORD_JOB_STATUS_MODTIME,
UPDATE_COORD_JOB_STATUS_PENDING_MODTIME,
UPDATE_COORD_JOB_LAST_MODIFIED_TIME,
UPDATE_COORD_JOB_STATUS_PENDING_TIME,
UPDATE_COORD_JOB_MATERIALIZE,
UPDATE_COORD_JOB_CHANGE,
UPDATE_COORD_JOB_CONF,
UPDATE_COORD_JOB_XML,
GET_COORD_JOB,
GET_COORD_JOB_USER_APPNAME,
GET_COORD_JOB_INPUT_CHECK,
GET_COORD_JOB_ACTION_READY,
GET_COORD_JOB_ACTION_KILL,
GET_COORD_JOB_MATERIALIZE,
GET_COORD_JOB_SUSPEND_KILL,
GET_COORD_JOB_STATUS,
GET_COORD_JOB_STATUS_PARENTID,
GET_COORD_JOBS_CHANGED,
GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION,
GET_COORD_FOR_ABANDONEDCHECK,
GET_COORD_IDS_FOR_STATUS_TRANSIT,
GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID,
GET_COORD_JOBS_WITH_PARENT_ID,
GET_COORD_JOB_CONF,
GET_COORD_JOB_XML
};
private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor();
private CoordJobQueryExecutor() {
}
public static CoordJobQueryExecutor getInstance() {
return CoordJobQueryExecutor.instance;
}
@Override
public Query getUpdateQuery(CoordJobQuery namedQuery, CoordinatorJobBean cjBean, EntityManager em)
throws JPAExecutorException {
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case UPDATE_COORD_JOB:
query.setParameter("appName", cjBean.getAppName());
query.setParameter("appPath", cjBean.getAppPath());
query.setParameter("concurrency", cjBean.getConcurrency());
query.setParameter("conf", cjBean.getConfBlob());
query.setParameter("externalId", cjBean.getExternalId());
query.setParameter("frequency", cjBean.getFrequency());
query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
query.setParameter("timeOut", cjBean.getTimeout());
query.setParameter("timeZone", cjBean.getTimeZone());
query.setParameter("createdTime", cjBean.getCreatedTimestamp());
query.setParameter("endTime", cjBean.getEndTimestamp());
query.setParameter("execution", cjBean.getExecution());
query.setParameter("jobXml", cjBean.getJobXmlBlob());
query.setParameter("lastAction", cjBean.getLastActionTimestamp());
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("nextMaterializedTime", cjBean.getNextMaterializedTimestamp());
query.setParameter("origJobXml", cjBean.getOrigJobXmlBlob());
query.setParameter("slaXml", cjBean.getSlaXmlBlob());
query.setParameter("startTime", cjBean.getStartTimestamp());
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("timeUnit", cjBean.getTimeUnitStr());
query.setParameter("appNamespace", cjBean.getAppNamespace());
query.setParameter("bundleId", cjBean.getBundleId());
query.setParameter("matThrottling", cjBean.getMatThrottling());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_STATUS:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_BUNDLEID:
query.setParameter("bundleId", cjBean.getBundleId());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_APPNAMESPACE:
query.setParameter("appNamespace", cjBean.getAppNamespace());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_STATUS_PENDING:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("pending", cjBean.isPending() ? 1 : 0);
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME:
query.setParameter("bundleId", cjBean.getBundleId());
query.setParameter("appNamespace", cjBean.getAppNamespace());
query.setParameter("pauseTime", cjBean.getPauseTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_STATUS_MODTIME:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_STATUS_PENDING_MODTIME:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("pending", cjBean.isPending() ? 1 : 0);
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_LAST_MODIFIED_TIME:
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_STATUS_PENDING_TIME:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("pending", cjBean.isPending() ? 1 : 0);
query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("suspendedTime", cjBean.getSuspendedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_MATERIALIZE:
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("pending", cjBean.isPending() ? 1 : 0);
query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_CHANGE:
query.setParameter("endTime", cjBean.getEndTimestamp());
query.setParameter("status", cjBean.getStatus().toString());
query.setParameter("pending", cjBean.isPending() ? 1 : 0);
query.setParameter("doneMaterialization", cjBean.isDoneMaterialization() ? 1 : 0);
query.setParameter("concurrency", cjBean.getConcurrency());
query.setParameter("pauseTime", cjBean.getPauseTimestamp());
query.setParameter("lastActionNumber", cjBean.getLastActionNumber());
query.setParameter("lastActionTime", cjBean.getLastActionTimestamp());
query.setParameter("nextMatdTime", cjBean.getNextMaterializedTimestamp());
query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_CONF:
query.setParameter("conf", cjBean.getConfBlob());
query.setParameter("id", cjBean.getId());
break;
case UPDATE_COORD_JOB_XML:
query.setParameter("jobXml", cjBean.getJobXmlBlob());
query.setParameter("id", cjBean.getId());
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
}
return query;
}
@Override
public Query getSelectQuery(CoordJobQuery namedQuery, EntityManager em, Object... parameters)
throws JPAExecutorException {
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case GET_COORD_JOB:
case GET_COORD_JOB_USER_APPNAME:
case GET_COORD_JOB_INPUT_CHECK:
case GET_COORD_JOB_ACTION_READY:
case GET_COORD_JOB_ACTION_KILL:
case GET_COORD_JOB_MATERIALIZE:
case GET_COORD_JOB_SUSPEND_KILL:
case GET_COORD_JOB_STATUS:
case GET_COORD_JOB_STATUS_PARENTID:
case GET_COORD_JOB_CONF:
case GET_COORD_JOB_XML:
query.setParameter("id", parameters[0]);
break;
case GET_COORD_JOBS_CHANGED:
query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime()));
break;
case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime()));
int limit = (Integer) parameters[1];
if (limit > 0) {
query.setMaxResults(limit);
}
break;
case GET_COORD_FOR_ABANDONEDCHECK:
query.setParameter(1, (Integer) parameters[0]);
query.setParameter(2, (Timestamp) parameters[1]);
break;
case GET_COORD_IDS_FOR_STATUS_TRANSIT:
query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
break;
case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
query.setParameter("appName", parameters[0]);
query.setParameter("bundleId", parameters[1]);
break;
case GET_COORD_JOBS_WITH_PARENT_ID:
query.setParameter("parentId", parameters[0]);
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
}
return query;
}
@Override
public int executeUpdate(CoordJobQuery namedQuery, CoordinatorJobBean jobBean) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getUpdateQuery(namedQuery, jobBean, em);
int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
return ret;
}
private CoordinatorJobBean constructBean(CoordJobQuery namedQuery, Object ret, Object... parameters)
throws JPAExecutorException {
CoordinatorJobBean bean;
Object[] arr;
switch (namedQuery) {
case GET_COORD_JOB:
bean = (CoordinatorJobBean) ret;
break;
case GET_COORD_JOB_USER_APPNAME:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setUser((String) arr[0]);
bean.setAppName((String) arr[1]);
break;
case GET_COORD_JOB_INPUT_CHECK:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setUser((String) arr[0]);
bean.setAppName((String) arr[1]);
bean.setStatusStr((String) arr[2]);
bean.setAppNamespace((String) arr[3]);
bean.setExecution((String) arr[4]);
bean.setFrequency((String) arr[5]);
bean.setTimeUnitStr((String) arr[6]);
bean.setTimeZone((String) arr[7]);
bean.setStartTime(DateUtils.toDate((Timestamp) arr[8]));
bean.setEndTime(DateUtils.toDate((Timestamp) arr[9]));
bean.setJobXmlBlob((StringBlob) arr[10]);
break;
case GET_COORD_JOB_ACTION_READY:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setUser((String) arr[1]);
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
bean.setStatusStr((String) arr[4]);
bean.setExecution((String) arr[5]);
bean.setConcurrency((Integer) arr[6]);
bean.setFrequency((String) arr[7]);
bean.setTimeUnitStr((String) arr[8]);
bean.setTimeZone((String) arr[9]);
bean.setStartTime(DateUtils.toDate((Timestamp) arr[10]));
bean.setEndTime(DateUtils.toDate((Timestamp) arr[11]));
bean.setJobXmlBlob((StringBlob) arr[12]);
break;
case GET_COORD_JOB_ACTION_KILL:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setUser((String) arr[1]);
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
bean.setStatusStr((String) arr[4]);
break;
case GET_COORD_JOB_MATERIALIZE:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setUser((String) arr[1]);
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
bean.setStatusStr((String) arr[4]);
bean.setFrequency((String) arr[5]);
bean.setMatThrottling((Integer) arr[6]);
bean.setTimeout((Integer) arr[7]);
bean.setTimeZone((String) arr[8]);
bean.setStartTime(DateUtils.toDate((Timestamp) arr[9]));
bean.setEndTime(DateUtils.toDate((Timestamp) arr[10]));
bean.setPauseTime(DateUtils.toDate((Timestamp) arr[11]));
bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[12]));
bean.setLastActionTime(DateUtils.toDate((Timestamp) arr[13]));
bean.setLastActionNumber((Integer) arr[14]);
bean.setDoneMaterialization((Integer) arr[15]);
bean.setBundleId((String) arr[16]);
bean.setConfBlob((StringBlob) arr[17]);
bean.setJobXmlBlob((StringBlob) arr[18]);
bean.setAppNamespace((String) arr[19]);
bean.setTimeUnitStr((String) arr[20]);
bean.setExecution((String) arr[21]);
break;
case GET_COORD_JOB_SUSPEND_KILL:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setUser((String) arr[1]);
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
bean.setStatusStr((String) arr[4]);
bean.setBundleId((String) arr[5]);
bean.setAppNamespace((String) arr[6]);
bean.setDoneMaterialization((Integer) arr[7]);
break;
case GET_COORD_JOB_STATUS:
bean = new CoordinatorJobBean();
bean.setId((String) parameters[0]);
bean.setStatusStr((String) ret);
break;
case GET_COORD_JOB_STATUS_PARENTID:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) parameters[0]);
bean.setStatusStr((String) arr[0]);
bean.setBundleId((String) arr[1]);
break;
case GET_COORD_JOBS_CHANGED:
bean = (CoordinatorJobBean) ret;
break;
case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setNextMaterializedTime(DateUtils.toDate((Timestamp) arr[1]));
break;
case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID:
bean = new CoordinatorJobBean();
bean.setId((String) ret);
break;
case GET_COORD_JOBS_WITH_PARENT_ID:
bean = new CoordinatorJobBean();
bean.setId((String) ret);
break;
case GET_COORD_FOR_ABANDONEDCHECK:
bean = new CoordinatorJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setUser((String) arr[1]);
bean.setGroup((String) arr[2]);
bean.setAppName((String) arr[3]);
break;
case GET_COORD_IDS_FOR_STATUS_TRANSIT:
bean = new CoordinatorJobBean();
bean.setId((String) ret);
break;
case GET_COORD_JOB_CONF:
bean = new CoordinatorJobBean();
bean.setConfBlob((StringBlob) ret);
break;
case GET_COORD_JOB_XML:
bean = new CoordinatorJobBean();
bean.setJobXmlBlob((StringBlob) ret);
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
}
return bean;
}
@Override
public CoordinatorJobBean get(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
CoordinatorJobBean bean = getIfExist(namedQuery, parameters);
if (bean == null) {
throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
}
return bean;
}
@Override
public List<CoordinatorJobBean> getList(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
List<CoordinatorJobBean> beanList = new ArrayList<CoordinatorJobBean>();
if (retList != null) {
for (Object ret : retList) {
beanList.add(constructBean(namedQuery, ret, parameters));
}
}
return beanList;
}
@Override
public Object getSingleValue(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
throw new UnsupportedOperationException();
}
@Override
public CoordinatorJobBean getIfExist(CoordJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
Object ret = jpaService.executeGet(namedQuery.name(), query, em);
if (ret == null) {
return null;
}
CoordinatorJobBean bean = constructBean(namedQuery, ret, parameters);
return bean;
}
}