blob: cc554d643ef4ce6b01431c77d452c832c947b9a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.StringBlob;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.DateUtils;
/**
* Query Executor that provides API to run query for Bundle Job
*/
public class BundleJobQueryExecutor extends QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> {
public enum BundleJobQuery {
UPDATE_BUNDLE_JOB,
UPDATE_BUNDLE_JOB_STATUS,
UPDATE_BUNDLE_JOB_STATUS_PENDING,
UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME,
UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME,
UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME,
UPDATE_BUNDLE_JOB_PAUSE_KICKOFF,
GET_BUNDLE_JOB,
GET_BUNDLE_JOB_STATUS,
GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME,
GET_BUNDLE_JOB_ID_JOBXML_CONF,
GET_BUNDLE_IDS_FOR_STATUS_TRANSIT
};
private static BundleJobQueryExecutor instance = new BundleJobQueryExecutor();
private BundleJobQueryExecutor() {
}
public static QueryExecutor<BundleJobBean, BundleJobQueryExecutor.BundleJobQuery> getInstance() {
return BundleJobQueryExecutor.instance;
}
@Override
public Query getUpdateQuery(BundleJobQuery namedQuery, BundleJobBean bjBean, EntityManager em)
throws JPAExecutorException {
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case UPDATE_BUNDLE_JOB:
query.setParameter("appName", bjBean.getAppName());
query.setParameter("appPath", bjBean.getAppPath());
query.setParameter("conf", bjBean.getConfBlob());
query.setParameter("timeOut", bjBean.getTimeout());
query.setParameter("createdTime", bjBean.getCreatedTimestamp());
query.setParameter("endTime", bjBean.getEndTimestamp());
query.setParameter("jobXml", bjBean.getJobXmlBlob());
query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
query.setParameter("origJobXml", bjBean.getOrigJobXmlBlob());
query.setParameter("startTime", bjBean.getstartTimestamp());
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("timeUnit", bjBean.getTimeUnitStr());
query.setParameter("pending", bjBean.isPending() ? 1 : 0);
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_STATUS:
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
query.setParameter("pending", bjBean.getPending());
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_STATUS_PENDING:
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("pending", bjBean.getPending());
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_STATUS_PENDING_SUSP_MOD_TIME:
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
query.setParameter("pending", bjBean.getPending());
query.setParameter("suspendedTime", bjBean.getSuspendedTimestamp());
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_STATUS_PENDING_MODTIME:
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("lastModifiedTime", bjBean.getLastModifiedTimestamp());
query.setParameter("pending", bjBean.getPending());
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME:
query.setParameter("status", bjBean.getStatus().toString());
query.setParameter("pauseTime", bjBean.getPauseTimestamp());
query.setParameter("endTime", bjBean.getEndTimestamp());
query.setParameter("id", bjBean.getId());
break;
case UPDATE_BUNDLE_JOB_PAUSE_KICKOFF:
query.setParameter("pauseTime", bjBean.getPauseTimestamp());
query.setParameter("kickoffTime", bjBean.getKickoffTimestamp());
query.setParameter("id", bjBean.getId());
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
}
return query;
}
@Override
public Query getSelectQuery(BundleJobQuery namedQuery, EntityManager em, Object... parameters)
throws JPAExecutorException {
Query query = em.createNamedQuery(namedQuery.name());
switch (namedQuery) {
case GET_BUNDLE_JOB:
case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
case GET_BUNDLE_JOB_ID_JOBXML_CONF:
case GET_BUNDLE_JOB_STATUS:
query.setParameter("id", parameters[0]);
break;
case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
query.setParameter("lastModifiedTime", DateUtils.convertDateToTimestamp((Date)parameters[0]));
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
+ namedQuery.name());
}
return query;
}
@Override
public int executeUpdate(BundleJobQuery namedQuery, BundleJobBean jobBean) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getUpdateQuery(namedQuery, jobBean, em);
int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
return ret;
}
@Override
public BundleJobBean get(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
BundleJobBean bean = getIfExist(namedQuery, parameters);
if (bean == null) {
throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
}
return bean;
}
@Override
public BundleJobBean getIfExist(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
Object ret = jpaService.executeGet(namedQuery.name(), query, em);
if (ret == null) {
return null;
}
BundleJobBean bean = constructBean(namedQuery, ret, parameters);
return bean;
}
@Override
public List<BundleJobBean> getList(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
Query query = getSelectQuery(namedQuery, em, parameters);
List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
List<BundleJobBean> beanList = new ArrayList<BundleJobBean>();
if (retList != null) {
for (Object ret : retList) {
beanList.add(constructBean(namedQuery, ret));
}
}
return beanList;
}
private BundleJobBean constructBean(BundleJobQuery namedQuery, Object ret, Object... parameters)
throws JPAExecutorException {
BundleJobBean bean;
Object[] arr;
switch (namedQuery) {
case GET_BUNDLE_JOB:
bean = (BundleJobBean) ret;
break;
case GET_BUNDLE_JOB_STATUS:
bean = new BundleJobBean();
bean.setId((String) parameters[0]);
bean.setStatus((String) ret);
break;
case GET_BUNDLE_JOB_ID_STATUS_PENDING_MOD_PAUSE_SUSPEND_TIME:
bean = new BundleJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setStatus((String) arr[1]);
bean.setPending((Integer) arr[2]);
bean.setLastModifiedTime(DateUtils.toDate((Timestamp) arr[3]));
bean.setPauseTime(DateUtils.toDate((Timestamp) arr[4]));
bean.setSuspendedTime(DateUtils.toDate((Timestamp) arr[5]));
break;
case GET_BUNDLE_JOB_ID_JOBXML_CONF:
bean = new BundleJobBean();
arr = (Object[]) ret;
bean.setId((String) arr[0]);
bean.setJobXmlBlob((StringBlob) arr[1]);
bean.setConfBlob((StringBlob) arr[2]);
break;
case GET_BUNDLE_IDS_FOR_STATUS_TRANSIT:
bean = new BundleJobBean();
bean.setId((String) ret);
break;
default:
throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for "
+ namedQuery.name());
}
return bean;
}
@Override
public Object getSingleValue(BundleJobQuery namedQuery, Object... parameters) throws JPAExecutorException {
throw new UnsupportedOperationException();
}
}