blob: f427ebe0448115b329982329ec0bac9f497a6687 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.executor.jpa;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.CoordinatorJobInfo;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.store.StoreStatusFilter;
import org.apache.openjpa.persistence.OpenJPAPersistence;
import org.apache.openjpa.persistence.OpenJPAQuery;
import org.apache.openjpa.persistence.jdbc.FetchDirection;
import org.apache.openjpa.persistence.jdbc.JDBCFetchPlan;
import org.apache.openjpa.persistence.jdbc.LRSSizeAlgorithm;
import org.apache.openjpa.persistence.jdbc.ResultSetType;
/**
* Load the CoordinatorInfo and return it.
*/
public class CoordJobInfoGetJPAExecutor implements JPAExecutor<CoordinatorJobInfo> {
public static final String DEFAULT_ORDER_BY = " order by w.createdTimestamp desc ";
private Map<String, List<String>> filter;
private int start = 1;
private int len = 50;
public CoordJobInfoGetJPAExecutor(Map<String, List<String>> filter, int start, int len) {
Objects.requireNonNull(filter, "filter cannot be null");
this.filter = filter;
this.start = start;
this.len = len;
}
@Override
public String getName() {
return "CoordJobInfoGetJPAExecutor";
}
@Override
@SuppressWarnings("unchecked")
public CoordinatorJobInfo execute(EntityManager em) throws JPAExecutorException {
List<String> orArray = new ArrayList<String>();
List<String> colArray = new ArrayList<String>();
List<Object> valArray = new ArrayList<Object>();
StringBuilder sb = new StringBuilder("");
String orderBy = DEFAULT_ORDER_BY;
StoreStatusFilter.filter(filter, orArray, colArray, valArray, sb, StoreStatusFilter.coordSeletStr,
StoreStatusFilter.coordCountStr);
orderBy = StoreStatusFilter.getSortBy(filter, orderBy);
int realLen = 0;
Query q = null;
Query qTotal = null;
if (orArray.size() == 0 && orderBy.equals(DEFAULT_ORDER_BY)) {
q = em.createNamedQuery("GET_COORD_JOBS_COLUMNS");
q.setFirstResult(start - 1);
q.setMaxResults(len);
qTotal = em.createNamedQuery("GET_COORD_JOBS_COUNT");
}
else {
sb = sb.toString().trim().length() == 0 ? sb.append(StoreStatusFilter.coordSeletStr) : sb;
String sbTotal = sb.toString();
sb.append(orderBy);
q = em.createQuery(sb.toString());
q.setFirstResult(start - 1);
q.setMaxResults(len);
qTotal = em.createQuery(sbTotal.replace(StoreStatusFilter.coordSeletStr,
StoreStatusFilter.coordCountStr));
}
for (int i = 0; i < orArray.size(); i++) {
q.setParameter(colArray.get(i), valArray.get(i));
qTotal.setParameter(colArray.get(i), valArray.get(i));
}
OpenJPAQuery kq = OpenJPAPersistence.cast(q);
JDBCFetchPlan fetch = (JDBCFetchPlan) kq.getFetchPlan();
fetch.setFetchBatchSize(20);
fetch.setResultSetType(ResultSetType.SCROLL_INSENSITIVE);
fetch.setFetchDirection(FetchDirection.FORWARD);
fetch.setLRSSizeAlgorithm(LRSSizeAlgorithm.LAST);
List<?> resultList = q.getResultList();
List<Object[]> objectArrList = (List<Object[]>) resultList;
List<CoordinatorJobBean> coordBeansList = new ArrayList<CoordinatorJobBean>();
for (Object[] arr : objectArrList) {
CoordinatorJobBean ww = getBeanForCoordinatorJobFromArray(arr);
coordBeansList.add(ww);
}
realLen = ((Long) qTotal.getSingleResult()).intValue();
return new CoordinatorJobInfo(coordBeansList, start, len, realLen);
}
private CoordinatorJobBean getBeanForCoordinatorJobFromArray(Object[] arr) {
CoordinatorJobBean bean = new CoordinatorJobBean();
bean.setId((String) arr[0]);
if (arr[1] != null) {
bean.setAppName((String) arr[1]);
}
if (arr[2] != null) {
bean.setStatus(Status.valueOf((String) arr[2]));
}
if (arr[3] != null) {
bean.setUser((String) arr[3]);
}
if (arr[4] != null) {
bean.setGroup((String) arr[4]);
}
if (arr[5] != null) {
bean.setStartTime((Timestamp) arr[5]);
}
if (arr[6] != null) {
bean.setEndTime((Timestamp) arr[6]);
}
if (arr[7] != null) {
bean.setAppPath((String) arr[7]);
}
if (arr[8] != null) {
bean.setConcurrency(((Integer) arr[8]).intValue());
}
if (arr[9] != null) {
bean.setFrequency((String) arr[9]);
}
if (arr[10] != null) {
bean.setLastActionTime((Timestamp) arr[10]);
}
if (arr[11] != null) {
bean.setNextMaterializedTime((Timestamp) arr[11]);
}
if (arr[12] != null) {
bean.setCreatedTime((Timestamp) arr[12]);
}
if (arr[13] != null) {
bean.setTimeUnit(Timeunit.valueOf((String) arr[13]));
}
if (arr[14] != null) {
bean.setTimeZone((String) arr[14]);
}
if (arr[15] != null) {
bean.setTimeout((Integer) arr[15]);
}
if (arr[16] != null) {
bean.setBundleId((String) arr[16]);
}
return bean;
}
}