blob: 0e9bebbe24d37820737303bff86b90d9960a36cc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command;
import com.google.common.annotations.VisibleForTesting;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.executor.jpa.BundleJobsDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobsGetForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetFromCoordJobIdJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsCountNotForPurgeFromParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetForPurgeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsDeleteJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsGetForPurgeJPAExecutor;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.eclipse.jgit.util.StringUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* This class is used to purge workflows, coordinators, and bundles. It takes into account the relationships between workflows and
* coordinators, and coordinators and bundles. It also only acts on 'limit' number of items at a time to not overtax the DB and in
* case something gets rolled back. Also, children are always deleted before their parents in case of a rollback.
*/
public class PurgeXCommand extends XCommand<Void> {
private JPAService jpaService = null;
private int wfOlderThan;
private int coordOlderThan;
private int bundleOlderThan;
private boolean purgeOldCoordAction = false;
private final int limit;
private List<String> wfList;
private List<String> coordActionList;
private List<String> coordList;
private List<String> bundleList;
private int wfDel;
private int coordDel;
private int coordActionDel;
private int bundleDel;
private static final long DAY_IN_MS = 24 * 60 * 60 * 1000;
interface JPAFunction<T, R> {
R apply(T t) throws JPAExecutorException;
}
final JPAFunction<String, List<WorkflowJobBean>> getSubWorkflowJobBeansFunction = new JPAFunction<String,
List<WorkflowJobBean>>() {
@Override
public List<WorkflowJobBean> apply(String wfId) throws JPAExecutorException {
return PurgeXCommand.this.getSubWorkflowJobBeans(wfId);
}
};
final JPAFunction<List<WorkflowJobBean>, List<String>> fetchTerminatedWorflowFunction = new JPAFunction<List<WorkflowJobBean>,
List<String>>() {
@Override
public List<String> apply(List<WorkflowJobBean> wfBeanList) throws JPAExecutorException {
return PurgeXCommand.this.fetchTerminatedWorkflow(wfBeanList);
}
};
@VisibleForTesting
static class SelectorTreeTraverser<T, U> {
final T rootNode;
final JPAFunction<T, List<U>> childrenFinder;
final JPAFunction<List<U>, List<T>> selector;
SelectorTreeTraverser(final T rootNode, final JPAFunction<T, List<U>> childrenFinder,
final JPAFunction<List<U>, List<T>> selector) {
this.rootNode = rootNode;
this.childrenFinder = childrenFinder;
this.selector = selector;
}
List<T> findAllDescendantNodesIfSelectable() throws JPAExecutorException {
List<T> allDescendantNodes = new ArrayList<>();
Set<T> uniqueDescendantNodes = new HashSet<>();
allDescendantNodes.add(rootNode);
uniqueDescendantNodes.add(rootNode);
int nextIndexToCheck = 0;
while (nextIndexToCheck < allDescendantNodes.size()) {
T id = allDescendantNodes.get(nextIndexToCheck);
List<U> childrenNodes = childrenFinder.apply(id);
List<T> selectedChildren = selector.apply(childrenNodes);
if (selectedChildren.size() == childrenNodes.size()) {
allDescendantNodes.addAll(selectedChildren);
uniqueDescendantNodes.addAll(selectedChildren);
if (allDescendantNodes.size() != uniqueDescendantNodes.size()) {
throw new JPAExecutorException(ErrorCode.E0613, rootNode);
}
}
else {
return new ArrayList<>();
}
++nextIndexToCheck;
}
return allDescendantNodes;
}
}
public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit) {
this(wfOlderThan, coordOlderThan, bundleOlderThan, limit, false);
}
public PurgeXCommand(int wfOlderThan, int coordOlderThan, int bundleOlderThan, int limit, boolean purgeOldCoordAction) {
super("purge", "purge", 0);
this.wfOlderThan = wfOlderThan;
this.coordOlderThan = coordOlderThan;
this.bundleOlderThan = bundleOlderThan;
this.purgeOldCoordAction = purgeOldCoordAction;
this.limit = limit;
wfList = new ArrayList<String>();
coordActionList = new ArrayList<String>();
coordList = new ArrayList<String>();
bundleList = new ArrayList<String>();
wfDel = 0;
coordDel = 0;
coordActionDel = 0;
bundleDel = 0;
}
@Override
protected void loadState() throws CommandException {
try {
jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
// Get the lists of workflows, coordinators, and bundles that can be purged (and have no parents)
int size;
do {
size = wfList.size();
wfList.addAll(jpaService.execute(new WorkflowJobsGetForPurgeJPAExecutor(wfOlderThan, wfList.size(), limit)));
} while(size != wfList.size());
if (purgeOldCoordAction) {
LOG.debug("Purging workflows of long running coordinators is turned on");
do {
size = coordActionList.size();
long olderThan = wfOlderThan;
List<WorkflowJobBean> jobBeans = WorkflowJobQueryExecutor.getInstance().getList(
WorkflowJobQuery.GET_COMPLETED_COORD_WORKFLOWS_OLDER_THAN, olderThan,
coordActionList.size(), limit);
for (WorkflowJobBean bean : jobBeans) {
coordActionList.add(bean.getParentId());
wfList.add(bean.getId());
}
} while(size != coordActionList.size());
}
do {
size = coordList.size();
coordList.addAll(jpaService.execute(
new CoordJobsGetForPurgeJPAExecutor(coordOlderThan, coordList.size(), limit)));
} while(size != coordList.size());
do {
size = bundleList.size();
bundleList.addAll(jpaService.execute(
new BundleJobsGetForPurgeJPAExecutor(bundleOlderThan, bundleList.size(), limit)));
} while(size != bundleList.size());
}
else {
throw new CommandException(ErrorCode.E0610);
}
}
catch (XException ex) {
throw new CommandException(ex);
}
}
@Override
protected Void execute() throws CommandException {
LOG.info("STARTED Purge to purge Workflow Jobs older than [{0}] days, Coordinator Jobs older than [{1}] days, and Bundle"
+ " jobs older than [{2}] days.", wfOlderThan, coordOlderThan, bundleOlderThan);
// Process parentless workflows to purge them and their children
if (!wfList.isEmpty()) {
try {
processWorkflows(wfList);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
// Process coordinator actions of long running coordinators and purge them
if (!coordActionList.isEmpty()) {
try {
purgeCoordActions(coordActionList);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
// Processs parentless coordinators to purge them and their children
if (!coordList.isEmpty()) {
try {
processCoordinators(coordList);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
// Process bundles to purge them and their children
if (!bundleList.isEmpty()) {
try {
processBundles(bundleList);
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
LOG.info("ENDED Purge deleted [{0}] workflows, [{1}] coordinatorActions, [{2}] coordinators, [{3}] bundles",
wfDel, coordActionDel, coordDel, bundleDel);
return null;
}
/**
* Process workflows to purge them and their children if all the descendants are purgeable. Skip the workflows that have
* non-purgeable descendants.
*
* @param wfs List of workflows to process
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void processWorkflows(List<String> wfs) throws JPAExecutorException {
List<String> wfsToPurge = findPurgeableWorkflows(wfs);
purgeWorkflows(wfsToPurge);
}
/**
* Get purgeable workflow list.
*
* @param workflows List of workflows to process
* @return List of workflows to purge
* @throws JPAExecutorException If a JPA executor has a problem
*/
private List<String> findPurgeableWorkflows(List<String> workflows) throws JPAExecutorException {
List<String> purgeableWorkflows = new ArrayList<>();
for (String workflowId : workflows) {
SelectorTreeTraverser<String, WorkflowJobBean> selectorTreeTraverser = new SelectorTreeTraverser<>(workflowId,
getSubWorkflowJobBeansFunction, fetchTerminatedWorflowFunction);
purgeableWorkflows.addAll(selectorTreeTraverser.findAllDescendantNodesIfSelectable());
}
return purgeableWorkflows;
}
private List<WorkflowJobBean> getSubWorkflowJobBeans(String wfId) throws JPAExecutorException {
int size;
List<WorkflowJobBean> swfBeanList = new ArrayList<>();
do {
size = swfBeanList.size();
swfBeanList.addAll(jpaService.execute(
new WorkflowJobsBasicInfoFromWorkflowParentIdJPAExecutor(wfId, swfBeanList.size(), limit)));
} while (size != swfBeanList.size());
return swfBeanList;
}
/**
* This method will return all terminate workflow ids from wfBeanlist for purge.
* @param wfBeanList
* @return workflows to purge
*/
private List<String> fetchTerminatedWorkflow(List<WorkflowJobBean> wfBeanList) {
List<String> children = new ArrayList<String>();
long wfOlderThanMS = System.currentTimeMillis() - (wfOlderThan * DAY_IN_MS);
for (WorkflowJobBean wfjBean : wfBeanList) {
if (isWorkflowPurgeable(wfjBean, wfOlderThanMS)) {
children.add(wfjBean.getId());
}
}
return children;
}
private boolean isWorkflowPurgeable(WorkflowJobBean wfjBean, long wfOlderThanMS) {
final Date wfEndTime = wfjBean.getEndTime();
final boolean isFinished = wfjBean.inTerminalState();
if (isFinished && wfEndTime != null && wfEndTime.getTime() < wfOlderThanMS) {
return true;
}
else {
final Date lastModificationTime = wfjBean.getLastModifiedTime();
if (isFinished && lastModificationTime != null && lastModificationTime.getTime() < wfOlderThanMS) {
return true;
}
}
return false;
}
/**
* Process coordinators to purge them and their children.
*
* @param coords List of coordinators to process
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void processCoordinators(List<String> coords) throws JPAExecutorException {
List<String> wfsToPurge = new ArrayList<String>();
List<String> actionsToPurge = new ArrayList<String>();
List<String> coordsToPurge = new ArrayList<String>();
for (String coordId : coords) {
// Get all of the direct workflowChildren for this coord
List<WorkflowJobBean> wfjBeanList = new ArrayList<WorkflowJobBean>();
int size;
do {
size = wfjBeanList.size();
wfjBeanList.addAll(jpaService.execute(
new WorkflowJobsBasicInfoFromCoordParentIdJPAExecutor(coordId, wfjBeanList.size(), limit)));
} while (size != wfjBeanList.size());
// Checking if workflow is ready to purge
List<String> workflowChildren = fetchTerminatedWorkflow(wfjBeanList);
// if all workflow are ready to purge add them and add the coordinator and their actions
if(workflowChildren.size() == wfjBeanList.size()) {
LOG.debug("Purging coordinator " + coordId);
wfsToPurge.addAll(workflowChildren);
coordsToPurge.add(coordId);
// Get all of the direct actionChildren for this coord
List<String> actionChildren = new ArrayList<String>();
do {
size = actionChildren.size();
actionChildren.addAll(jpaService.execute(
new CoordActionsGetFromCoordJobIdJPAExecutor(coordId, actionChildren.size(), limit)));
} while (size != actionChildren.size());
actionsToPurge.addAll(actionChildren);
}
}
// Process the children workflow
processWorkflows(wfsToPurge);
// Process the children action
purgeCoordActions(actionsToPurge);
// Now that all children have been purged, we can purge the coordinators
purgeCoordinators(coordsToPurge);
}
/**
* Process bundles to purge them and their children
*
* @param bundles List of bundles to process
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void processBundles(List<String> bundles) throws JPAExecutorException {
List<String> coordsToPurge = new ArrayList<String>();
List<String> bundlesToPurge = new ArrayList<String>();
for (Iterator<String> it = bundles.iterator(); it.hasNext(); ) {
String bundleId = it.next();
// We only purge the bundle and its children if they are all ready to be purged
long numChildrenNotReady = jpaService.execute(
new CoordJobsCountNotForPurgeFromParentIdJPAExecutor(coordOlderThan, bundleId));
if (numChildrenNotReady == 0) {
bundlesToPurge.add(bundleId);
LOG.debug("Purging bundle " + bundleId);
// Get all of the direct children for this bundle
List<String> children = new ArrayList<String>();
int size;
do {
size = children.size();
children.addAll(jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, children.size(), limit)));
} while (size != children.size());
coordsToPurge.addAll(children);
}
}
// Process the children
processCoordinators(coordsToPurge);
// Now that all children have been purged, we can purge the bundles
purgeBundles(bundlesToPurge);
}
/**
* Purge the workflows in REVERSE order in batches of size 'limit' (this must be done in reverse order so that children are
* purged before their parents)
*
* @param wfs List of workflows to purge
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void purgeWorkflows(List<String> wfs) throws JPAExecutorException {
wfDel += wfs.size();
//To delete sub-workflows before deleting parent workflows
Collections.reverse(wfs);
for (int startIndex = 0; startIndex < wfs.size(); ) {
int endIndex = (startIndex + limit < wfs.size()) ? (startIndex + limit) : wfs.size();
List<String> wfsForDelete = wfs.subList(startIndex, endIndex);
LOG.debug("Deleting workflows: " + StringUtils.join(wfsForDelete, ","));
jpaService.execute(new WorkflowJobsDeleteJPAExecutor(wfsForDelete));
startIndex = endIndex;
}
}
/**
* Purge coordActions of long running coordinators and purge them
*
* @param coordActions List of coordActions to purge
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void purgeCoordActions(List<String> coordActions) throws JPAExecutorException {
coordActionDel += coordActions.size();
for (int startIndex = 0; startIndex < coordActions.size(); ) {
int endIndex = (startIndex + limit < coordActions.size()) ? (startIndex + limit) : coordActions.size();
List<String> coordActionsForDelete = coordActions.subList(startIndex, endIndex);
LOG.debug("Deleting coordinator actions: " + StringUtils.join(coordActionsForDelete, ","));
jpaService.execute(new CoordActionsDeleteJPAExecutor(coordActionsForDelete));
startIndex = endIndex;
}
}
/**
* Purge the coordinators in SOME order in batches of size 'limit' (its in reverse order only for convenience)
*
* @param coords List of coordinators to purge
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void purgeCoordinators(List<String> coords) throws JPAExecutorException {
coordDel += coords.size();
for (int startIndex = 0; startIndex < coords.size(); ) {
int endIndex = (startIndex + limit < coords.size()) ? (startIndex + limit) : coords.size();
List<String> coordsForDelete = coords.subList(startIndex, endIndex);
LOG.debug("Deleting coordinators: " + StringUtils.join(coordsForDelete, ","));
jpaService.execute(new CoordJobsDeleteJPAExecutor(coordsForDelete));
startIndex = endIndex;
}
}
/**
* Purge the bundles in SOME order in batches of size 'limit' (its in reverse order only for convenience)
*
* @param bundles List of bundles to purge
* @throws JPAExecutorException If a JPA executor has a problem
*/
private void purgeBundles(List<String> bundles) throws JPAExecutorException {
bundleDel += bundles.size();
for (int startIndex = 0; startIndex < bundles.size(); ) {
int endIndex = (startIndex + limit < bundles.size()) ? (startIndex + limit) : bundles.size();
Collection<String> bundlesForDelete = bundles.subList(startIndex, endIndex);
LOG.debug("Deleting bundles: " + StringUtils.join(bundlesForDelete, ","));
jpaService.execute(new BundleJobsDeleteJPAExecutor(bundlesForDelete));
startIndex = endIndex;
}
}
@Override
public String getEntityKey() {
return "purge_command";
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
}
}