| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.executor.jpa; |
| |
| import java.sql.Timestamp; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| |
| import javax.persistence.EntityManager; |
| import javax.persistence.Query; |
| |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.StringBlob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.service.JPAService; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.util.DateUtils; |
| |
| /** |
| * Query Executor that provides API to run query for Coordinator Action |
| */ |
| public class CoordActionQueryExecutor extends |
| QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> { |
| |
| public enum CoordActionQuery { |
| UPDATE_COORD_ACTION, |
| UPDATE_COORD_ACTION_STATUS_PENDING_TIME, |
| UPDATE_COORD_ACTION_FOR_INPUTCHECK, |
| UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK, |
| UPDATE_COORD_ACTION_DEPENDENCIES, |
| UPDATE_COORD_ACTION_FOR_START, |
| UPDATE_COORD_ACTION_FOR_MODIFIED_DATE, |
| UPDATE_COORD_ACTION_RERUN, |
| GET_COORD_ACTION, |
| GET_COORD_ACTION_STATUS, |
| GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, |
| GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, |
| GET_COORD_ACTIONS_STATUS_UNIGNORED, |
| GET_COORD_ACTIONS_PENDING_COUNT, |
| GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, |
| GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, |
| GET_TERMINATED_ACTIONS_FOR_DATES, |
| GET_TERMINATED_ACTION_IDS_FOR_DATES, |
| GET_ACTIVE_ACTIONS_FOR_DATES, |
| GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN, |
| GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN, |
| GET_COORD_ACTION_FOR_SLA, |
| GET_COORD_ACTION_FOR_INPUTCHECK |
| }; |
| |
| private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); |
| |
| private CoordActionQueryExecutor() { |
| } |
| |
| public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() { |
| return CoordActionQueryExecutor.instance; |
| } |
| |
| @Override |
| public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em) |
| throws JPAExecutorException { |
| |
| Query query = em.createNamedQuery(namedQuery.name()); |
| switch (namedQuery) { |
| case UPDATE_COORD_ACTION: |
| query.setParameter("actionNumber", actionBean.getActionNumber()); |
| query.setParameter("actionXml", actionBean.getActionXmlBlob()); |
| query.setParameter("consoleUrl", actionBean.getConsoleUrl()); |
| query.setParameter("createdConf", actionBean.getCreatedConfBlob()); |
| query.setParameter("errorCode", actionBean.getErrorCode()); |
| query.setParameter("errorMessage", actionBean.getErrorMessage()); |
| query.setParameter("externalStatus", actionBean.getExternalStatus()); |
| query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); |
| query.setParameter("runConf", actionBean.getRunConfBlob()); |
| query.setParameter("timeOut", actionBean.getTimeOut()); |
| query.setParameter("trackerUri", actionBean.getTrackerUri()); |
| query.setParameter("type", actionBean.getType()); |
| query.setParameter("createdTime", actionBean.getCreatedTimestamp()); |
| query.setParameter("externalId", actionBean.getExternalId()); |
| query.setParameter("jobId", actionBean.getJobId()); |
| query.setParameter("lastModifiedTime", new Date()); |
| query.setParameter("nominalTime", actionBean.getNominalTimestamp()); |
| query.setParameter("slaXml", actionBean.getSlaXmlBlob()); |
| query.setParameter("status", actionBean.getStatus().toString()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_STATUS_PENDING_TIME: |
| query.setParameter("status", actionBean.getStatus().toString()); |
| query.setParameter("pending", actionBean.getPending()); |
| query.setParameter("lastModifiedTime", new Date()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_FOR_INPUTCHECK: |
| query.setParameter("status", actionBean.getStatus().toString()); |
| query.setParameter("lastModifiedTime", new Date()); |
| query.setParameter("actionXml", actionBean.getActionXmlBlob()); |
| query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK: |
| query.setParameter("status", actionBean.getStatus().toString()); |
| query.setParameter("lastModifiedTime", new Date()); |
| query.setParameter("actionXml", actionBean.getActionXmlBlob()); |
| query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_DEPENDENCIES: |
| query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); |
| query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_FOR_START: |
| query.setParameter("status", actionBean.getStatus().toString()); |
| query.setParameter("lastModifiedTime", new Date()); |
| query.setParameter("runConf", actionBean.getRunConfBlob()); |
| query.setParameter("externalId", actionBean.getExternalId()); |
| query.setParameter("pending", actionBean.getPending()); |
| query.setParameter("errorCode", actionBean.getErrorCode()); |
| query.setParameter("errorMessage", actionBean.getErrorMessage()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE: |
| query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| case UPDATE_COORD_ACTION_RERUN: |
| query.setParameter("actionXml", actionBean.getActionXmlBlob()); |
| query.setParameter("status", actionBean.getStatusStr()); |
| query.setParameter("externalId", actionBean.getExternalId()); |
| query.setParameter("externalStatus", actionBean.getExternalStatus()); |
| query.setParameter("rerunTime", actionBean.getRerunTimestamp()); |
| query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp()); |
| query.setParameter("createdTime", actionBean.getCreatedTimestamp()); |
| query.setParameter("createdConf", actionBean.getCreatedConfBlob()); |
| query.setParameter("runConf", actionBean.getRunConfBlob()); |
| query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob()); |
| query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob()); |
| query.setParameter("errorCode", actionBean.getErrorCode()); |
| query.setParameter("errorMessage", actionBean.getErrorMessage()); |
| query.setParameter("id", actionBean.getId()); |
| break; |
| |
| default: |
| throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " |
| + namedQuery.name()); |
| } |
| return query; |
| } |
| |
| @Override |
| public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters) |
| throws JPAExecutorException { |
| Query query = em.createNamedQuery(namedQuery.name()); |
| CoordActionQuery caQuery = (CoordActionQuery) namedQuery; |
| switch (caQuery) { |
| case GET_COORD_ACTION: |
| case GET_COORD_ACTION_STATUS: |
| case GET_COORD_ACTION_FOR_SLA: |
| case GET_COORD_ACTION_FOR_INPUTCHECK: |
| query.setParameter("id", parameters[0]); |
| break; |
| case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: |
| query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); |
| break; |
| case GET_COORD_ACTIONS_STATUS_UNIGNORED: |
| query.setParameter("jobId", parameters[0]); |
| break; |
| case GET_COORD_ACTIONS_PENDING_COUNT: |
| query.setParameter("jobId", parameters[0]); |
| break; |
| case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: |
| query.setParameter("ids", parameters[0]); |
| break; |
| case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: |
| query.setParameter("jobId", parameters[0]); |
| break; |
| case GET_TERMINATED_ACTIONS_FOR_DATES: |
| case GET_TERMINATED_ACTION_IDS_FOR_DATES: |
| case GET_ACTIVE_ACTIONS_FOR_DATES: |
| query.setParameter("jobId", parameters[0]); |
| query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime())); |
| query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime())); |
| break; |
| case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: |
| query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); |
| break; |
| case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: |
| query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); |
| query.setParameter("currentTime", new Timestamp(new Date().getTime())); |
| break; |
| |
| default: |
| throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " |
| + caQuery.name()); |
| } |
| return query; |
| } |
| |
| @Override |
| public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| EntityManager em = jpaService.getEntityManager(); |
| Query query = getUpdateQuery(namedQuery, jobBean, em); |
| int ret = jpaService.executeUpdate(namedQuery.name(), query, em); |
| return ret; |
| } |
| |
| @Override |
| public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { |
| CoordinatorActionBean bean = getIfExist(namedQuery, parameters); |
| if (bean == null) { |
| throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery, |
| Services.get().get(JPAService.class).getEntityManager(), parameters).toString()); |
| } |
| return bean; |
| } |
| |
| @Override |
| public CoordinatorActionBean getIfExist(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| EntityManager em = jpaService.getEntityManager(); |
| Query query = getSelectQuery(namedQuery, em, parameters); |
| Object ret = jpaService.executeGet(namedQuery.name(), query, em); |
| if (ret == null) { |
| return null; |
| } |
| CoordinatorActionBean bean = constructBean(namedQuery, ret); |
| return bean; |
| } |
| |
| @Override |
| public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters) |
| throws JPAExecutorException { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| EntityManager em = jpaService.getEntityManager(); |
| Query query = getSelectQuery(namedQuery, em, parameters); |
| List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); |
| List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>(); |
| if (retList != null) { |
| for (Object ret : retList) { |
| beanList.add(constructBean(namedQuery, ret)); |
| } |
| } |
| return beanList; |
| } |
| |
| private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException { |
| CoordinatorActionBean bean; |
| Object[] arr; |
| switch (namedQuery) { |
| case GET_COORD_ACTION: |
| bean = (CoordinatorActionBean) ret; |
| break; |
| case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME: |
| bean = new CoordinatorActionBean(); |
| bean.setJobId((String) ret); |
| break; |
| case GET_COORD_ACTION_STATUS: |
| bean = new CoordinatorActionBean(); |
| bean.setStatusStr((String)ret); |
| break; |
| case GET_COORD_ACTIONS_STATUS_UNIGNORED: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setStatusStr((String)arr[0]); |
| bean.setPending((Integer)arr[1]); |
| break; |
| case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: |
| case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String)arr[0]); |
| bean.setNominalTime((Timestamp)arr[1]); |
| bean.setCreatedTime((Timestamp)arr[2]); |
| bean.setActionXmlBlob((StringBlob)arr[3]); |
| break; |
| case GET_TERMINATED_ACTIONS_FOR_DATES: |
| bean = (CoordinatorActionBean) ret; |
| break; |
| case GET_TERMINATED_ACTION_IDS_FOR_DATES: |
| bean = new CoordinatorActionBean(); |
| bean.setId((String) ret); |
| break; |
| case GET_ACTIVE_ACTIONS_FOR_DATES: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String)arr[0]); |
| bean.setJobId((String)arr[1]); |
| bean.setStatusStr((String) arr[2]); |
| bean.setExternalId((String) arr[3]); |
| bean.setPending((Integer) arr[4]); |
| bean.setNominalTime((Timestamp) arr[5]); |
| bean.setCreatedTime((Timestamp) arr[6]); |
| break; |
| case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String)arr[0]); |
| bean.setJobId((String)arr[1]); |
| bean.setStatusStr((String) arr[2]); |
| bean.setExternalId((String) arr[3]); |
| bean.setPending((Integer) arr[4]); |
| break; |
| case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String)arr[0]); |
| bean.setJobId((String)arr[1]); |
| bean.setStatusStr((String) arr[2]); |
| bean.setExternalId((String) arr[3]); |
| bean.setPushMissingDependenciesBlob((StringBlob) arr[4]); |
| break; |
| case GET_COORD_ACTION_FOR_SLA: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String) arr[0]); |
| bean.setJobId((String) arr[1]); |
| bean.setStatusStr((String) arr[2]); |
| bean.setExternalId((String) arr[3]); |
| bean.setLastModifiedTime((Timestamp) arr[4]); |
| break; |
| case GET_COORD_ACTION_FOR_INPUTCHECK: |
| arr = (Object[]) ret; |
| bean = new CoordinatorActionBean(); |
| bean.setId((String) arr[0]); |
| bean.setActionNumber((Integer) arr[1]); |
| bean.setJobId((String) arr[2]); |
| bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[3])); |
| bean.setRunConfBlob((StringBlob) arr[4]); |
| bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5])); |
| bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6])); |
| bean.setActionXmlBlob((StringBlob) arr[7]); |
| bean.setMissingDependenciesBlob((StringBlob) arr[8]); |
| bean.setPushMissingDependenciesBlob((StringBlob) arr[9]); |
| bean.setTimeOut((Integer) arr[10]); |
| bean.setExternalId((String) arr[11]); |
| break; |
| |
| default: |
| throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " |
| + namedQuery.name()); |
| } |
| return bean; |
| } |
| |
| @Override |
| public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| EntityManager em = jpaService.getEntityManager(); |
| Query query = getSelectQuery(namedQuery, em, parameters); |
| Object ret = jpaService.executeGet(namedQuery.name(), query, em); |
| if (ret == null) { |
| throw new JPAExecutorException(ErrorCode.E0604, query.toString()); |
| } |
| return ret; |
| } |
| } |