blob: 2d93dbe74ddf04fb08c9472f866743bca41667a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.executor.jpa;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.Query;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.JPAService.QueryEntry;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.sla.SLASummaryBean;
/**
* Query Executor that provides API to run multiple update/insert queries in one
* transaction. This guarantees entire change to be rolled back when one of
* queries fails.
*/
public class BatchQueryExecutor {
private static BatchQueryExecutor instance = new BatchQueryExecutor();
public static class UpdateEntry<E extends Enum<E>> {
E namedQuery;
JsonBean bean;
public UpdateEntry(E namedQuery, JsonBean bean) {
this.bean = bean;
this.namedQuery = namedQuery;
}
public JsonBean getBean() {
return this.bean;
}
public E getQueryName() {
return this.namedQuery;
}
}
private BatchQueryExecutor() {
}
public static BatchQueryExecutor getInstance() {
return BatchQueryExecutor.instance;
}
@SuppressWarnings("rawtypes")
public void executeBatchInsertUpdateDelete(Collection<JsonBean> insertList, Collection<UpdateEntry> updateList,
Collection<JsonBean> deleteList) throws JPAExecutorException {
List<QueryEntry> queryList = new ArrayList<QueryEntry>();
JPAService jpaService = Services.get().get(JPAService.class);
EntityManager em = jpaService.getEntityManager();
if (updateList != null) {
for (UpdateEntry entry : updateList) {
Query query = null;
JsonBean bean = entry.getBean();
if (bean instanceof WorkflowJobBean) {
query = WorkflowJobQueryExecutor.getInstance().getUpdateQuery(
(WorkflowJobQuery) entry.getQueryName(), (WorkflowJobBean) entry.getBean(), em);
}
else if (bean instanceof WorkflowActionBean) {
query = WorkflowActionQueryExecutor.getInstance().getUpdateQuery(
(WorkflowActionQuery) entry.getQueryName(), (WorkflowActionBean) entry.getBean(), em);
}
else if (bean instanceof CoordinatorJobBean) {
query = CoordJobQueryExecutor.getInstance().getUpdateQuery((CoordJobQuery) entry.getQueryName(),
(CoordinatorJobBean) entry.getBean(), em);
}
else if (bean instanceof CoordinatorActionBean) {
query = CoordActionQueryExecutor.getInstance().getUpdateQuery(
(CoordActionQuery) entry.getQueryName(), (CoordinatorActionBean) entry.getBean(), em);
}
else if (bean instanceof BundleJobBean) {
query = BundleJobQueryExecutor.getInstance().getUpdateQuery((BundleJobQuery) entry.getQueryName(),
(BundleJobBean) entry.getBean(), em);
}
else if (bean instanceof BundleActionBean) {
query = BundleActionQueryExecutor.getInstance().getUpdateQuery(
(BundleActionQuery) entry.getQueryName(), (BundleActionBean) entry.getBean(), em);
}
else if (bean instanceof SLARegistrationBean) {
query = SLARegistrationQueryExecutor.getInstance().getUpdateQuery(
(SLARegQuery) entry.getQueryName(), (SLARegistrationBean) entry.getBean(), em);
}
else if (bean instanceof SLASummaryBean) {
query = SLASummaryQueryExecutor.getInstance().getUpdateQuery(
(SLASummaryQuery) entry.getQueryName(), (SLASummaryBean) entry.getBean(), em);
}
else {
throw new JPAExecutorException(ErrorCode.E0603, "BatchQueryExecutor failed to construct a query");
}
queryList.add(new QueryEntry(entry.getQueryName(), query));
}
}
jpaService.executeBatchInsertUpdateDelete(insertList, queryList, deleteList, em);
}
}