| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.executor.jpa; |
| |
| import java.sql.Timestamp; |
| import java.text.ParseException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.persistence.EntityManager; |
| import javax.persistence.Query; |
| |
| import org.apache.oozie.BulkResponseInfo; |
| import org.apache.oozie.BundleJobBean; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.StringBlob; |
| import org.apache.oozie.client.BundleJob; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.rest.BulkResponseImpl; |
| import org.apache.oozie.service.Services; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.ParamChecker; |
| |
| /** |
| * The query executor class for bulk monitoring queries i.e. debugging bundle coord actions directly |
| */ |
| public class BulkJPAExecutor implements JPAExecutor<BulkResponseInfo> { |
| private Map<String, List<String>> bulkFilter; |
| // defaults |
| private int start = 1; |
| private int len = 50; |
| private enum PARAM_TYPE { |
| ID, NAME |
| } |
| |
| public BulkJPAExecutor(Map<String, List<String>> bulkFilter, int start, int len) { |
| ParamChecker.notNull(bulkFilter, "bulkFilter"); |
| this.bulkFilter = bulkFilter; |
| this.start = start; |
| this.len = len; |
| } |
| |
| @Override |
| public String getName() { |
| return "BulkJPAExecutor"; |
| } |
| |
| @Override |
| public BulkResponseInfo execute(EntityManager em) throws JPAExecutorException { |
| List<BulkResponseImpl> responseList = new ArrayList<BulkResponseImpl>(); |
| Map<String, Timestamp> actionTimes = new HashMap<String, Timestamp>(); |
| |
| try { |
| List<String> coords = bulkFilter.get(BulkResponseImpl.BULK_FILTER_COORD); |
| List<String> statuses = bulkFilter.get(BulkResponseImpl.BULK_FILTER_STATUS); |
| List<String> params = new ArrayList<String>(); |
| |
| // Lightweight Query 1 on Bundle level to fetch the bundle job(s) |
| // corresponding to names or ids |
| List<BundleJobBean> bundleBeans = bundleQuery(em); |
| |
| // Join query between coordinator job and coordinator action tables |
| // to get entries for specific bundleId only |
| String conditions = actionQuery(coords, params, statuses, em, bundleBeans, actionTimes, responseList); |
| |
| // Query to get the count of records |
| long total = countQuery(statuses, params, conditions, em, bundleBeans, actionTimes); |
| |
| BulkResponseInfo bulk = new BulkResponseInfo(responseList, start, len, total); |
| return bulk; |
| } |
| catch (Exception e) { |
| throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * build the bundle level query to get bundle beans for the specified ids or appnames |
| * @param em |
| * @return List BundleJobBeans |
| * @throws JPAExecutorException |
| */ |
| @SuppressWarnings("unchecked") |
| private List<BundleJobBean> bundleQuery(EntityManager em) throws JPAExecutorException { |
| Query q = em.createNamedQuery("BULK_MONITOR_BUNDLE_QUERY"); |
| StringBuilder bundleQuery = new StringBuilder(q.toString()); |
| |
| StringBuilder whereClause = null; |
| List<String> bundles = bulkFilter.get(BulkResponseImpl.BULK_FILTER_BUNDLE); |
| if (bundles != null) { |
| PARAM_TYPE type = getParamType(bundles.get(0), 'B'); |
| if (type == PARAM_TYPE.NAME) { |
| whereClause = inClause(bundles.size(), "appName", 'b', "bundles"); |
| } |
| else if (type == PARAM_TYPE.ID) { |
| whereClause = inClause(bundles.size(), "id", 'b', "bundles"); |
| } |
| |
| // Query: select <columns> from BundleJobBean b where b.id IN (...) _or_ b.appName IN (...) |
| bundleQuery.append(whereClause.replace(whereClause.indexOf("AND"), whereClause.indexOf("AND") + 3, "WHERE")); |
| Query tmp = em.createQuery(bundleQuery.toString()); |
| |
| fillParameters(tmp, "bundles", bundles); |
| |
| List<Object[]> bundleObjs = (List<Object[]>) tmp.getResultList(); |
| if (bundleObjs.isEmpty()) { |
| throw new JPAExecutorException(ErrorCode.E0603, "No entries found for given bundle(s)"); |
| } |
| |
| List<BundleJobBean> bundleBeans = new ArrayList<BundleJobBean>(); |
| for (Object[] bundleElem : bundleObjs) { |
| bundleBeans.add(constructBundleBean(bundleElem)); |
| } |
| return bundleBeans; |
| } |
| return null; |
| } |
| |
| /** |
| * Validate and determine whether passed param is job-id or appname |
| * @param id |
| * @param job |
| * @return PARAM_TYPE |
| */ |
| private PARAM_TYPE getParamType(String id, char job) { |
| Pattern p = Pattern.compile("\\d{7}-\\d{15}-" + Services.get().getSystemId() + "-" + job); |
| Matcher m = p.matcher(id); |
| if (m.matches()) { |
| return PARAM_TYPE.ID; |
| } |
| return PARAM_TYPE.NAME; |
| } |
| |
| /** |
| * Compose the coord action level query comprising bundle id/appname filter and coord action |
| * status filter (if specified) and start-time or nominal-time filter (if specified) |
| * @param em |
| * @param bundles |
| * @param times |
| * @param responseList |
| * @return Query string |
| * @throws ParseException |
| */ |
| @SuppressWarnings("unchecked") |
| private String actionQuery(final List<String> coords, final List<String> params, List<String> statuses, EntityManager em, |
| List<BundleJobBean> bundles, Map<String, Timestamp> times, List<BulkResponseImpl> responseList) |
| throws ParseException { |
| Query q = em.createNamedQuery("BULK_MONITOR_ACTIONS_QUERY"); |
| StringBuilder getActions = new StringBuilder(q.toString()); |
| int offset = getActions.indexOf("ORDER"); |
| StringBuilder conditionClause = new StringBuilder(); |
| |
| // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id |
| // AND c.bundleId = :bundleId AND c.appName/id IN (...) |
| |
| if (coords != null) { |
| PARAM_TYPE type = getParamType(coords.get(0), 'C'); |
| if (type == PARAM_TYPE.NAME) { |
| conditionClause.append(inClause(coords.size(), "appName", 'c', "param")); |
| params.addAll(coords); |
| } |
| else if (type == PARAM_TYPE.ID) { |
| conditionClause.append(inClause(coords.size(), "id", 'c', "param")); |
| params.addAll(coords); |
| } |
| } |
| // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id |
| // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) |
| conditionClause.append(statusClause(statuses)); |
| |
| offset = getActions.indexOf("ORDER"); |
| getActions.insert(offset - 1, conditionClause); |
| |
| // Query: Select <columns> from CoordinatorActionBean a, CoordinatorJobBean c WHERE a.jobId = c.id |
| // AND c.bundleId = :bundleId AND c.appName/id IN (...) AND a.statusStr IN (...) |
| // AND a.createdTimestamp >= startCreated _or_ a.createdTimestamp <= endCreated |
| // AND a.nominalTimestamp >= startNominal _or_ a.nominalTimestamp <= endNominal |
| timesClause(getActions, offset, times); |
| q = em.createQuery(getActions.toString()); |
| Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Entry<String, Timestamp> time = iter.next(); |
| q.setParameter(time.getKey(), time.getValue()); |
| } |
| // pagination |
| q.setFirstResult(start - 1); |
| q.setMaxResults(len); |
| |
| if (coords != null) { |
| fillParameters(q, "param", coords); |
| } |
| |
| if (statuses != null) { |
| fillParameters(q, "status", statuses); |
| } |
| |
| // repeatedly execute above query for each bundle |
| for (BundleJobBean bundle : bundles) { |
| q.setParameter("bundleId", bundle.getId()); |
| List<Object[]> response = q.getResultList(); |
| for (Object[] r : response) { |
| BulkResponseImpl br = getResponseFromObject(bundle, r); |
| responseList.add(br); |
| } |
| } |
| return q.toString(); |
| } |
| |
| /** |
| * Get total number of records for use with offset and len in API |
| * @param clause |
| * @param em |
| * @param bundles |
| * @return total count of coord actions |
| */ |
| private long countQuery(List<String> statuses, List<String> params, String clause, EntityManager em, |
| List<BundleJobBean> bundles, Map<String, Timestamp> times) { |
| Query q = em.createNamedQuery("BULK_MONITOR_COUNT_QUERY"); |
| StringBuilder getTotal = new StringBuilder(q.toString() + " "); |
| // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c |
| // get entire WHERE clause from above i.e. actionQuery() for all conditions on coordinator job |
| // and action status and times |
| getTotal.append(clause.substring(clause.indexOf("WHERE"), clause.indexOf("ORDER"))); |
| int offset = getTotal.indexOf("bundleId"); |
| List<String> bundleIds = new ArrayList<String>(); |
| for (BundleJobBean bundle : bundles) { |
| bundleIds.add(bundle.getId()); |
| } |
| // Query: select COUNT(a) from CoordinatorActionBean a, CoordinatorJobBean c WHERE ... |
| // AND c.bundleId IN (... list of bundle ids) i.e. replace single :bundleId with list |
| getTotal = getTotal.replace(offset - 6, offset + 20, inClause(bundleIds.size(), "bundleId", 'c', "count").toString()); |
| q = em.createQuery(getTotal.toString()); |
| |
| fillParameters(q, "count", bundleIds); |
| |
| if (statuses != null) { |
| fillParameters(q, "status", statuses); |
| } |
| |
| if (params != null) { |
| fillParameters(q, "param", params); |
| } |
| |
| Iterator<Entry<String, Timestamp>> iter = times.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Entry<String, Timestamp> time = iter.next(); |
| q.setParameter(time.getKey(), time.getValue()); |
| } |
| long total = ((Long) q.getSingleResult()).longValue(); |
| return total; |
| } |
| |
| // Form the where clause to filter by coordinator appname/id |
| private StringBuilder inClause(int noOfValues, String col, char type, String paramPrefix) { |
| StringBuilder sb = new StringBuilder(); |
| boolean firstVal = true; |
| |
| for (int i = 0; i < noOfValues; i++) { |
| if (firstVal) { |
| sb.append(" AND " + type + "." + col + " IN (:" + paramPrefix + i); |
| firstVal = false; |
| } |
| else { |
| sb.append(", :" + paramPrefix + i); |
| } |
| } |
| if (!firstVal) { |
| sb.append(") "); |
| } |
| |
| return sb; |
| } |
| |
| // Form the where clause to filter by coord action status |
| private StringBuilder statusClause(List<String> statuses) { |
| StringBuilder sb = new StringBuilder(); |
| |
| if (statuses != null) { |
| sb = inClause(statuses.size(), "statusStr", 'a', "status"); |
| } |
| |
| if (sb.length() == 0) { // statuses was null. adding default |
| sb.append(" AND a.statusStr IN ('KILLED', 'FAILED') "); |
| } |
| |
| return sb; |
| } |
| |
| private void timesClause(StringBuilder sb, int offset, Map<String, Timestamp> eachTime) throws ParseException { |
| Timestamp ts = null; |
| List<String> times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); |
| if (times != null) { |
| ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); |
| sb.insert(offset - 1, " AND a.createdTimestamp >= :startCreated"); |
| eachTime.put("startCreated", ts); |
| } |
| times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH); |
| if (times != null) { |
| ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); |
| sb.insert(offset - 1, " AND a.createdTimestamp <= :endCreated"); |
| eachTime.put("endCreated", ts); |
| } |
| times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH); |
| if (times != null) { |
| ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); |
| sb.insert(offset - 1, " AND a.nominalTimestamp >= :startNominal"); |
| eachTime.put("startNominal", ts); |
| } |
| times = bulkFilter.get(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); |
| if (times != null) { |
| ts = new Timestamp(DateUtils.parseDateUTC(times.get(0)).getTime()); |
| sb.insert(offset - 1, " AND a.nominalTimestamp <= :endNominal"); |
| eachTime.put("endNominal", ts); |
| } |
| } |
| |
| private BulkResponseImpl getResponseFromObject(BundleJobBean bundleBean, Object arr[]) { |
| BulkResponseImpl bean = new BulkResponseImpl(); |
| CoordinatorJobBean coordBean = new CoordinatorJobBean(); |
| CoordinatorActionBean actionBean = new CoordinatorActionBean(); |
| if (arr[0] != null) { |
| actionBean.setId((String) arr[0]); |
| } |
| if (arr[1] != null) { |
| actionBean.setActionNumber((Integer) arr[1]); |
| } |
| if (arr[2] != null) { |
| actionBean.setErrorCode((String) arr[2]); |
| } |
| if (arr[3] != null) { |
| actionBean.setErrorMessage((String) arr[3]); |
| } |
| if (arr[4] != null) { |
| actionBean.setExternalId((String) arr[4]); |
| } |
| if (arr[5] != null) { |
| actionBean.setExternalStatus((String) arr[5]); |
| } |
| if (arr[6] != null) { |
| actionBean.setStatus(CoordinatorAction.Status.valueOf((String) arr[6])); |
| } |
| if (arr[7] != null) { |
| actionBean.setCreatedTime(DateUtils.toDate((Timestamp) arr[7])); |
| } |
| if (arr[8] != null) { |
| actionBean.setNominalTime(DateUtils.toDate((Timestamp) arr[8])); |
| } |
| if (arr[9] != null) { |
| actionBean.setMissingDependenciesBlob((StringBlob) arr[9]); |
| } |
| if (arr[10] != null) { |
| coordBean.setId((String) arr[10]); |
| actionBean.setJobId((String) arr[10]); |
| } |
| if (arr[11] != null) { |
| coordBean.setAppName((String) arr[11]); |
| } |
| if (arr[12] != null) { |
| coordBean.setStatus(CoordinatorJob.Status.valueOf((String) arr[12])); |
| } |
| bean.setBundle(bundleBean); |
| bean.setCoordinator(coordBean); |
| bean.setAction(actionBean); |
| return bean; |
| } |
| |
| private BundleJobBean constructBundleBean(Object[] barr) throws JPAExecutorException { |
| BundleJobBean bean = new BundleJobBean(); |
| if (barr[0] != null) { |
| bean.setId((String) barr[0]); |
| } |
| else { |
| throw new JPAExecutorException(ErrorCode.E0603, |
| "bundleId returned by query is null - cannot retrieve bulk results"); |
| } |
| if (barr[1] != null) { |
| bean.setAppName((String) barr[1]); |
| } |
| if (barr[2] != null) { |
| bean.setStatus(BundleJob.Status.valueOf((String) barr[2])); |
| } |
| if (barr[3] != null) { |
| bean.setUser((String) barr[3]); |
| } |
| return bean; |
| } |
| |
| private void fillParameters(Query query, String prefix, List<?> values) { |
| for (int i = 0; i < values.size(); i++) { |
| query.setParameter(prefix + i, values.get(i)); |
| } |
| } |
| |
| // null safeguard |
| public static List<String> nullToEmpty(List<String> input) { |
| return input == null ? Collections.<String> emptyList() : input; |
| } |
| |
| } |