blob: b59361b75ca626cfe1e330c75818c445923e2611 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.text.MessageFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.dependency.hcat.HCatDependencyCache;
import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
/**
* Module that functions like a caching service to maintain partition dependency mappings
*/
public class PartitionDependencyManagerService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService.";
public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl";
public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval";
public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl";
private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class);
private HCatDependencyCache dependencyCache;
/**
* Keep timestamp when missing dependencies of a coord action are registered
*/
private ConcurrentMap<String, Long> registeredCoordActionMap;
private boolean purgeEnabled = false;
@Override
public void init(Services services) throws ServiceException {
init(services.getConf());
}
private void init(Configuration conf) throws ServiceException {
Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null);
dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache()
: (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null);
dependencyCache.init(conf);
LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass()
.getName());
purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode();
if (purgeEnabled) {
Runnable purgeThread = new CachePurgeWorker(dependencyCache);
// schedule runnable by default every 10 min
Services.get()
.get(SchedulerService.class)
.schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600),
SchedulerService.Unit.SEC);
registeredCoordActionMap = new ConcurrentHashMap<String, Long>();
}
}
private class CachePurgeWorker implements Runnable {
HCatDependencyCache cache;
public CachePurgeWorker(HCatDependencyCache cache) {
this.cache = cache;
}
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
return;
}
try {
purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800));
}
catch (Throwable error) {
XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error);
}
}
private void purgeMissingDependency(int timeToLive) {
long currentTime = new Date().getTime();
Set<String> staleActions = new HashSet<String>();
Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator();
while(actionItr.hasNext()){
String actionId = actionItr.next();
Long regTime = registeredCoordActionMap.get(actionId);
if(regTime < (currentTime - timeToLive * 1000)){
CoordinatorActionBean caBean = null;
try {
caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId);
}
catch (JPAExecutorException e) {
if (e.getErrorCode() == ErrorCode.E0605) {
LOG.info(MessageFormat.format(
"Coord action {0} is not in database, deleting it from cache", actionId));
staleActions.add(actionId);
actionItr.remove();
}
else {
LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e);
}
}
if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){
staleActions.add(actionId);
actionItr.remove();
}
}
}
dependencyCache.removeNonWaitingCoordActions(staleActions);
}
}
@Override
public void destroy() {
dependencyCache.destroy();
}
@Override
public Class<? extends Service> getInterface() {
return PartitionDependencyManagerService.class;
}
/**
* Add a missing partition dependency and the actionID waiting on it
*
* @param hcatURI dependency URI
* @param actionID ID of action which is waiting for the dependency
*/
public void addMissingDependency(HCatURI hcatURI, String actionID) {
if (purgeEnabled) {
registeredCoordActionMap.put(actionID, new Date().getTime());
}
dependencyCache.addMissingDependency(hcatURI, actionID);
}
/**
* Remove a missing partition dependency associated with a actionID
*
* @param hcatURI dependency URI
* @param actionID ID of action which is waiting for the dependency
* @return true if successful, else false
*/
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
return dependencyCache.removeMissingDependency(hcatURI, actionID);
}
/**
* Get the list of actionIDs waiting for a partition
*
* @param hcatURI dependency URI
* @return list of actionIDs
*/
public Collection<String> getWaitingActions(HCatURI hcatURI) {
return dependencyCache.getWaitingActions(hcatURI);
}
/**
* Mark a partition dependency as available
*
* @param server host:port of the server
* @param db name of the database
* @param table name of the table
* @param partitions list of available partitions
*/
public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) {
Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table,
partitions);
if (actionsWithAvailableDep != null) {
for (String actionID : actionsWithAvailableDep) {
boolean ret = Services.get().get(CallableQueueService.class)
.queue(new CoordActionUpdatePushMissingDependency(actionID), 100);
if (ret == false) {
XLog.getLog(getClass()).warn(
"Unable to queue the callable commands for PartitionDependencyManagerService for actionID "
+ actionID + ".Most possibly command queue is full. Queue size is :"
+ Services.get().get(CallableQueueService.class).queueSize());
}
}
}
}
/**
* Get a list of available dependency URIs for a actionID
*
* @param actionID action id
* @return list of available dependency URIs
*/
public Collection<String> getAvailableDependencyURIs(String actionID) {
return dependencyCache.getAvailableDependencyURIs(actionID);
}
/**
* Remove the list of available dependency URIs for a actionID once the missing dependencies are processed.
*
* @param actionID action id
* @param dependencyURIs set of dependency URIs
* @return true if successful, else false
*/
public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs);
}
/**
* Remove a coord action from dependency cache when all push missing dependencies available
*
* @param actionID action id
*/
public void removeCoordActionWithDependenciesAvailable(String actionID) {
if (purgeEnabled) {
registeredCoordActionMap.remove(actionID);
}
dependencyCache.removeCoordActionWithDependenciesAvailable(actionID);
}
@VisibleForTesting
public void runCachePurgeWorker() {
new CachePurgeWorker(dependencyCache).run();
}
}