blob: a8b58d53bd28ad7095bcdd98eaab11232ee2815a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.dependency.hcat;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheException;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Ehcache;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.CacheConfiguration;
import net.sf.ehcache.event.CacheEventListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEventListener {
private static XLog LOG = XLog.getLog(EhcacheHCatDependencyCache.class);
private static String TABLE_DELIMITER = "#";
private static String PARTITION_DELIMITER = ";";
public static String CONF_CACHE_NAME = PartitionDependencyManagerService.CONF_PREFIX + "cache.ehcache.name";
private CacheManager cacheManager;
private boolean useCanonicalHostName = false;
/**
* Map of server to EhCache which has key as db#table#pk1;pk2#val;val2 and value as WaitingActions (list of
* WaitingAction) which is Serializable (for overflowToDisk)
*/
private ConcurrentMap<String, Cache> missingDepsByServer;
private CacheConfiguration cacheConfig;
/**
* Map of server#db#table - sorted part key pattern - count of different partition values (count
* of elements in the cache) still missing for a partition key pattern. This count is used to
* quickly determine if there are any more missing dependencies for a table. When the count
* becomes 0, we unregister from notifications as there are no more missing dependencies for
* that table.
*/
private ConcurrentMap<String, ConcurrentMap<String, SettableInteger>> partKeyPatterns;
/**
* Map of actionIDs and collection of available URIs
*/
private ConcurrentMap<String, Collection<String>> availableDeps;
@Override
public void init(Configuration conf) {
String cacheName = conf.get(CONF_CACHE_NAME);
URL cacheConfigURL;
if (cacheName == null) {
cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache-default.xml");
cacheName = "dependency-default";
}
else {
cacheConfigURL = this.getClass().getClassLoader().getResource("ehcache.xml");
}
if (cacheConfigURL == null) {
throw new IllegalStateException("ehcache.xml is not found in classpath");
}
cacheManager = CacheManager.newInstance(cacheConfigURL);
final Cache specifiedCache = cacheManager.getCache(cacheName);
if (specifiedCache == null) {
throw new IllegalStateException("Cache " + cacheName + " configured in " + CONF_CACHE_NAME
+ " is not found");
}
cacheConfig = specifiedCache.getCacheConfiguration();
missingDepsByServer = new ConcurrentHashMap<String, Cache>();
partKeyPatterns = new ConcurrentHashMap<String, ConcurrentMap<String, SettableInteger>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
useCanonicalHostName = ConfigurationService.getBoolean(SimpleHCatDependencyCache.USE_CANONICAL_HOSTNAME);
}
@Override
public void addMissingDependency(HCatURI hcatURI, String actionID) {
String serverName = canonicalizeHostname(hcatURI.getServer());
// Create cache for the server if we don't have one
Cache missingCache = missingDepsByServer.get(serverName);
if (missingCache == null) {
CacheConfiguration clonedConfig = cacheConfig.clone();
clonedConfig.setName(serverName);
missingCache = new Cache(clonedConfig);
Cache exists = missingDepsByServer.putIfAbsent(serverName, missingCache);
if (exists == null) {
cacheManager.addCache(missingCache);
missingCache.getCacheEventNotificationService().registerListener(this);
}
else {
missingCache.dispose(); //discard
}
}
// Add hcat uri into the missingCache
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+ partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
boolean newlyAdded = true;
synchronized (missingCache) {
Element element = missingCache.get(missingKey);
if (element == null) {
WaitingActions waitingActions = new WaitingActions();
element = new Element(missingKey, waitingActions);
Element exists = missingCache.putIfAbsent(element);
if (exists != null) {
newlyAdded = false;
waitingActions = (WaitingActions) exists.getObjectValue();
}
waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
else {
newlyAdded = false;
WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
}
// Increment count for the partition key pattern
if (newlyAdded) {
String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
synchronized (partKeyPatterns) {
ConcurrentMap<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
patternCounts = new ConcurrentHashMap<String, SettableInteger>();
partKeyPatterns.put(tableKey, patternCounts);
}
SettableInteger count = patternCounts.get(partKeys);
if (count == null) {
patternCounts.put(partKeys, new SettableInteger(1));
}
else {
count.increment();
}
}
}
}
@Override
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
if (missingCache == null) {
LOG.warn("Remove missing dependency - Missing cache entry for server - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER +
partKeys + TABLE_DELIMITER + sortedPKV.getPartVals();
boolean decrement = false;
boolean removed = false;
synchronized (missingCache) {
Element element = missingCache.get(missingKey);
if (element == null) {
LOG.warn("Remove missing dependency - Missing cache entry - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue()).getWaitingActions();
removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
if (!removed) {
LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
}
if (waitingActions.isEmpty()) {
missingCache.remove(missingKey);
decrement = true;
}
}
// Decrement partition key pattern count if the cache entry is removed
if (decrement) {
String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb() + TABLE_DELIMITER
+ hcatURI.getTable();
decrementPartKeyPatternCount(tableKey, partKeys, hcatURI.toURIString());
}
return removed;
}
@Override
public Collection<String> getWaitingActions(HCatURI hcatURI) {
Collection<String> actionIDs = null;
Cache missingCache = missingDepsByServer.get(canonicalizeHostname(hcatURI.getServer()));
if (missingCache != null) {
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String missingKey = hcatURI.getDb() + TABLE_DELIMITER + hcatURI.getTable() + TABLE_DELIMITER
+ sortedPKV.getPartKeys() + TABLE_DELIMITER + sortedPKV.getPartVals();
Element element = missingCache.get(missingKey);
if (element != null) {
WaitingActions waitingActions = (WaitingActions) element.getObjectValue();
actionIDs = new ArrayList<String>();
URI uri = hcatURI.getURI();
String uriString = null;
try {
uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString();
}
catch (URISyntaxException e) {
uriString = hcatURI.toURIString();
}
for (WaitingAction action : waitingActions.getWaitingActions()) {
if (action.getDependencyURI().equals(uriString)) {
actionIDs.add(action.getActionID());
}
}
}
}
return actionIDs;
}
@Override
public Collection<String> markDependencyAvailable(String server, String db, String table,
Map<String, String> partitions) {
String tableKey = canonicalizeHostname(server) + TABLE_DELIMITER + db + TABLE_DELIMITER + table;
synchronized (partKeyPatterns) {
Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
LOG.warn("Got partition available notification for " + tableKey
+ ". Unexpected as no matching partition keys. Unregistering topic");
unregisterFromNotifications(server, db, table);
return null;
}
Cache missingCache = missingDepsByServer.get(server);
if (missingCache == null) {
LOG.warn("Got partition available notification for " + tableKey
+ ". Unexpected. Missing server entry in cache. Unregistering topic");
partKeyPatterns.remove(tableKey);
unregisterFromNotifications(server, db, table);
return null;
}
Collection<String> actionsWithAvailDep = new HashSet<String>();
StringBuilder partValSB = new StringBuilder();
// If partition patterns are date, date;country and date;country;state,
// construct the partition values for each pattern and for the matching value in the
// missingCache, get the waiting actions and mark it as available.
for (Entry<String, SettableInteger> entry : patternCounts.entrySet()) {
String[] partKeys = entry.getKey().split(PARTITION_DELIMITER);
partValSB.setLength(0);
for (String key : partKeys) {
partValSB.append(partitions.get(key)).append(PARTITION_DELIMITER);
}
partValSB.setLength(partValSB.length() - 1);
String missingKey = db + TABLE_DELIMITER + table + TABLE_DELIMITER + entry.getKey() + TABLE_DELIMITER
+ partValSB.toString();
boolean removed = false;
Element element = null;
synchronized (missingCache) {
element = missingCache.get(missingKey);
if (element != null) {
missingCache.remove(missingKey);
removed = true;
}
}
if (removed) {
decrementPartKeyPatternCount(tableKey, entry.getKey(), server + TABLE_DELIMITER + missingKey);
// Add the removed entry to available dependencies
Collection<WaitingAction> wActions = ((WaitingActions) element.getObjectValue())
.getWaitingActions();
for (WaitingAction wAction : wActions) {
String actionID = wAction.getActionID();
actionsWithAvailDep.add(actionID);
Collection<String> depURIs = availableDeps.get(actionID);
if (depURIs == null) {
depURIs = new ArrayList<String>();
Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
if (existing != null) {
depURIs = existing;
}
}
synchronized (depURIs) {
depURIs.add(wAction.getDependencyURI());
availableDeps.put(actionID, depURIs);
}
}
}
}
return actionsWithAvailDep;
}
}
@Override
public Collection<String> getAvailableDependencyURIs(String actionID) {
Collection<String> available = availableDeps.get(actionID);
if (available != null) {
// Return a copy
available = new ArrayList<String>(available);
}
return available;
}
@Override
public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
if (!availableDeps.containsKey(actionID)) {
return false;
}
else {
Collection<String> availList = availableDeps.get(actionID);
if (!availList.removeAll(dependencyURIs)) {
return false;
}
synchronized (availList) {
if (availList.isEmpty()) {
availableDeps.remove(actionID);
}
}
}
return true;
}
@Override
public void destroy() {
availableDeps.clear();
cacheManager.shutdown();
}
@Override
public Object clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException();
}
@Override
public void dispose() {
}
@Override
public void notifyElementExpired(Ehcache cache, Element element) {
// Invoked when timeToIdleSeconds or timeToLiveSeconds is met
String missingDepKey = (String) element.getObjectKey();
LOG.info("Cache entry [{0}] of cache [{1}] expired", missingDepKey, cache.getName());
onExpiryOrEviction(cache, element, missingDepKey);
}
@Override
public void notifyElementPut(Ehcache arg0, Element arg1) throws CacheException {
}
@Override
public void notifyElementRemoved(Ehcache arg0, Element arg1) throws CacheException {
}
@Override
public void notifyElementUpdated(Ehcache arg0, Element arg1) throws CacheException {
}
@Override
public void notifyRemoveAll(Ehcache arg0) {
}
@Override
public void notifyElementEvicted(Ehcache cache, Element element) {
// Invoked when maxElementsInMemory is met
String missingDepKey = (String) element.getObjectKey();
LOG.info("Cache entry [{0}] of cache [{1}] evicted", missingDepKey, cache.getName());
onExpiryOrEviction(cache, element, missingDepKey);
}
private void onExpiryOrEviction(Ehcache cache, Element element, String missingDepKey) {
int partValIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER);
int partKeyIndex = missingDepKey.lastIndexOf(TABLE_DELIMITER, partValIndex - 1);
// server#db#table. Name of the cache is that of the server.
String tableKey = cache.getName() + TABLE_DELIMITER + missingDepKey.substring(0, partKeyIndex);
String partKeys = missingDepKey.substring(partKeyIndex + 1, partValIndex);
decrementPartKeyPatternCount(tableKey, partKeys, missingDepKey);
}
/**
* Decrement partition key pattern count, once a hcat URI is removed from the cache
*
* @param tableKey key identifying the table - server#db#table
* @param partKeys partition key pattern
* @param hcatURI URI with the partition key pattern
*/
private void decrementPartKeyPatternCount(String tableKey, String partKeys, String hcatURI) {
synchronized (partKeyPatterns) {
Map<String, SettableInteger> patternCounts = partKeyPatterns.get(tableKey);
if (patternCounts == null) {
LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
+ "But no corresponding pattern key table entry", hcatURI);
}
else {
SettableInteger count = patternCounts.get(partKeys);
if (count == null) {
LOG.warn("Removed dependency - Missing cache entry - uri={0}. "
+ "But no corresponding pattern key entry", hcatURI);
}
else {
count.decrement();
if (count.getValue() == 0) {
patternCounts.remove(partKeys);
}
if (patternCounts.isEmpty()) {
partKeyPatterns.remove(tableKey);
String[] tableDetails = tableKey.split(TABLE_DELIMITER);
unregisterFromNotifications(tableDetails[0], tableDetails[1], tableDetails[2]);
}
}
}
}
}
private void unregisterFromNotifications(String server, String db, String table) {
// Close JMS session. Stop listening on topic
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
hcatService.unregisterFromNotification(server, db, table);
}
private static class SortedPKV {
private StringBuilder partKeys;
private StringBuilder partVals;
public SortedPKV(Map<String, String> partitions) {
this.partKeys = new StringBuilder();
this.partVals = new StringBuilder();
ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
Collections.sort(keys);
for (String key : keys) {
this.partKeys.append(key).append(PARTITION_DELIMITER);
this.partVals.append(partitions.get(key)).append(PARTITION_DELIMITER);
}
this.partKeys.setLength(partKeys.length() - 1);
this.partVals.setLength(partVals.length() - 1);
}
public String getPartKeys() {
return partKeys.toString();
}
public String getPartVals() {
return partVals.toString();
}
}
private static class SettableInteger {
private int value;
public SettableInteger(int value) {
this.value = value;
}
public int getValue() {
return value;
}
public void increment() {
value++;
}
public void decrement() {
value--;
}
}
@Override
public void removeNonWaitingCoordActions(Set<String> staleActions) {
for (Entry<String, Cache> entry : missingDepsByServer.entrySet()) {
Cache missingCache = entry.getValue();
if (missingCache == null) {
continue;
}
synchronized (missingCache) {
for (Object key : missingCache.getKeys()) {
Element element = missingCache.get(key);
if (element == null) {
continue;
}
Collection<WaitingAction> waitingActions = ((WaitingActions) element.getObjectValue())
.getWaitingActions();
Iterator<WaitingAction> wactionItr = waitingActions.iterator();
HCatURI hcatURI = null;
while(wactionItr.hasNext()) {
WaitingAction waction = wactionItr.next();
if(staleActions.contains(waction.getActionID())) {
try {
hcatURI = new HCatURI(waction.getDependencyURI());
wactionItr.remove();
}
catch (URISyntaxException e) {
continue;
}
}
}
if (waitingActions.isEmpty() && hcatURI != null) {
missingCache.remove(key);
// Decrement partition key pattern count if the cache entry is removed
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKeys = sortedPKV.getPartKeys();
String tableKey = canonicalizeHostname(hcatURI.getServer()) + TABLE_DELIMITER + hcatURI.getDb()
+ TABLE_DELIMITER + hcatURI.getTable();
String hcatURIStr = hcatURI.toURIString();
decrementPartKeyPatternCount(tableKey, partKeys, hcatURIStr);
}
}
}
}
}
@Override
public void removeCoordActionWithDependenciesAvailable(String coordAction) {
// to be implemented when reverse-lookup data structure for purging is added
}
public String canonicalizeHostname(String name) {
return SimpleHCatDependencyCache.canonicalizeHostname(name, useCanonicalHostName);
}
}