| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.oozie.service; |
| |
| import java.text.MessageFormat; |
| import java.util.Collection; |
| import java.util.Date; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| |
| import org.apache.commons.cli.ParseException; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.ErrorCode; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency; |
| import org.apache.oozie.dependency.hcat.HCatDependencyCache; |
| import org.apache.oozie.dependency.hcat.SimpleHCatDependencyCache; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; |
| import org.apache.oozie.util.HCatURI; |
| import org.apache.oozie.util.XLog; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| /** |
| * Module that functions like a caching service to maintain partition dependency mappings |
| */ |
| public class PartitionDependencyManagerService implements Service { |
| |
| public static final String CONF_PREFIX = Service.CONF_PREFIX + "PartitionDependencyManagerService."; |
| public static final String CACHE_MANAGER_IMPL = CONF_PREFIX + "cache.manager.impl"; |
| public static final String CACHE_PURGE_INTERVAL = CONF_PREFIX + "cache.purge.interval"; |
| public static final String CACHE_PURGE_TTL = CONF_PREFIX + "cache.purge.ttl"; |
| |
| private static XLog LOG = XLog.getLog(PartitionDependencyManagerService.class); |
| |
| private HCatDependencyCache dependencyCache; |
| |
| /** |
| * Keep timestamp when missing dependencies of a coord action are registered |
| */ |
| private ConcurrentMap<String, Long> registeredCoordActionMap; |
| |
| private boolean purgeEnabled = false; |
| |
| @Override |
| public void init(Services services) throws ServiceException { |
| init(services.getConf()); |
| } |
| |
| private void init(Configuration conf) throws ServiceException { |
| Class<?> defaultClass = conf.getClass(CACHE_MANAGER_IMPL, null); |
| dependencyCache = (defaultClass == null) ? new SimpleHCatDependencyCache() |
| : (HCatDependencyCache) ReflectionUtils.newInstance(defaultClass, null); |
| dependencyCache.init(conf); |
| LOG.info("PartitionDependencyManagerService initialized. Dependency cache is {0} ", dependencyCache.getClass() |
| .getName()); |
| purgeEnabled = Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode(); |
| if (purgeEnabled) { |
| Runnable purgeThread = new CachePurgeWorker(dependencyCache); |
| // schedule runnable by default every 10 min |
| Services.get() |
| .get(SchedulerService.class) |
| .schedule(purgeThread, 10, Services.get().getConf().getInt(CACHE_PURGE_INTERVAL, 600), |
| SchedulerService.Unit.SEC); |
| registeredCoordActionMap = new ConcurrentHashMap<String, Long>(); |
| } |
| } |
| |
| private class CachePurgeWorker implements Runnable { |
| HCatDependencyCache cache; |
| public CachePurgeWorker(HCatDependencyCache cache) { |
| this.cache = cache; |
| } |
| |
| @Override |
| public void run() { |
| if (Thread.currentThread().isInterrupted()) { |
| return; |
| } |
| try { |
| purgeMissingDependency(Services.get().getConf().getInt(CACHE_PURGE_TTL, 1800)); |
| } |
| catch (Throwable error) { |
| XLog.getLog(PartitionDependencyManagerService.class).debug("Throwable in CachePurgeWorker thread run : ", error); |
| } |
| } |
| |
| private void purgeMissingDependency(int timeToLive) { |
| long currentTime = new Date().getTime(); |
| Set<String> staleActions = new HashSet<String>(); |
| Iterator<String> actionItr = registeredCoordActionMap.keySet().iterator(); |
| while(actionItr.hasNext()){ |
| String actionId = actionItr.next(); |
| Long regTime = registeredCoordActionMap.get(actionId); |
| if(regTime < (currentTime - timeToLive * 1000)){ |
| CoordinatorActionBean caBean = null; |
| try { |
| caBean = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION_STATUS, actionId); |
| } |
| catch (JPAExecutorException e) { |
| if (e.getErrorCode() == ErrorCode.E0605) { |
| LOG.info(MessageFormat.format( |
| "Coord action {0} is not in database, deleting it from cache", actionId)); |
| staleActions.add(actionId); |
| actionItr.remove(); |
| } |
| else { |
| LOG.warn("Error in checking coord action:" + actionId + "to purge, skipping", e); |
| } |
| } |
| if(caBean != null && !caBean.getStatus().equals(CoordinatorAction.Status.WAITING)){ |
| staleActions.add(actionId); |
| actionItr.remove(); |
| } |
| } |
| } |
| dependencyCache.removeNonWaitingCoordActions(staleActions); |
| } |
| } |
| |
| @Override |
| public void destroy() { |
| dependencyCache.destroy(); |
| } |
| |
| @Override |
| public Class<? extends Service> getInterface() { |
| return PartitionDependencyManagerService.class; |
| } |
| |
| /** |
| * Add a missing partition dependency and the actionID waiting on it |
| * |
| * @param hcatURI dependency URI |
| * @param actionID ID of action which is waiting for the dependency |
| */ |
| public void addMissingDependency(HCatURI hcatURI, String actionID) { |
| if (purgeEnabled) { |
| registeredCoordActionMap.put(actionID, new Date().getTime()); |
| } |
| dependencyCache.addMissingDependency(hcatURI, actionID); |
| } |
| |
| /** |
| * Remove a missing partition dependency associated with a actionID |
| * |
| * @param hcatURI dependency URI |
| * @param actionID ID of action which is waiting for the dependency |
| * @return true if successful, else false |
| */ |
| public boolean removeMissingDependency(HCatURI hcatURI, String actionID) { |
| return dependencyCache.removeMissingDependency(hcatURI, actionID); |
| } |
| |
| /** |
| * Get the list of actionIDs waiting for a partition |
| * |
| * @param hcatURI dependency URI |
| * @return list of actionIDs |
| */ |
| public Collection<String> getWaitingActions(HCatURI hcatURI) { |
| return dependencyCache.getWaitingActions(hcatURI); |
| } |
| |
| /** |
| * Mark a partition dependency as available |
| * |
| * @param server host:port of the server |
| * @param db name of the database |
| * @param table name of the table |
| * @param partitions list of available partitions |
| */ |
| public void partitionAvailable(String server, String db, String table, Map<String, String> partitions) { |
| Collection<String> actionsWithAvailableDep = dependencyCache.markDependencyAvailable(server, db, table, |
| partitions); |
| if (actionsWithAvailableDep != null) { |
| for (String actionID : actionsWithAvailableDep) { |
| boolean ret = Services.get().get(CallableQueueService.class) |
| .queue(new CoordActionUpdatePushMissingDependency(actionID), 100); |
| if (ret == false) { |
| XLog.getLog(getClass()).warn( |
| "Unable to queue the callable commands for PartitionDependencyManagerService for actionID " |
| + actionID + ".Most possibly command queue is full. Queue size is :" |
| + Services.get().get(CallableQueueService.class).queueSize()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Get a list of available dependency URIs for a actionID |
| * |
| * @param actionID action id |
| * @return list of available dependency URIs |
| */ |
| public Collection<String> getAvailableDependencyURIs(String actionID) { |
| return dependencyCache.getAvailableDependencyURIs(actionID); |
| } |
| |
| /** |
| * Remove the list of available dependency URIs for a actionID once the missing dependencies are processed. |
| * |
| * @param actionID action id |
| * @param dependencyURIs set of dependency URIs |
| * @return true if successful, else false |
| */ |
| public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) { |
| return dependencyCache.removeAvailableDependencyURIs(actionID, dependencyURIs); |
| } |
| |
| /** |
| * Remove a coord action from dependency cache when all push missing dependencies available |
| * |
| * @param actionID action id |
| */ |
| public void removeCoordActionWithDependenciesAvailable(String actionID) { |
| if (purgeEnabled) { |
| registeredCoordActionMap.remove(actionID); |
| } |
| dependencyCache.removeCoordActionWithDependenciesAvailable(actionID); |
| } |
| |
| @VisibleForTesting |
| public void runCachePurgeWorker() { |
| new CachePurgeWorker(dependencyCache).run(); |
| } |
| } |