blob: 2bfe0cfc4effa11690b82c16a304a4b206373e54 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.dependency.hcat;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XLog;
public class SimpleHCatDependencyCache implements HCatDependencyCache {
private static XLog LOG = XLog.getLog(SimpleHCatDependencyCache.class);
private static String DELIMITER = ";";
public static final String USE_CANONICAL_HOSTNAME ="oozie.service.HCatAccessorService.jms.use.canonical.hostname";
private boolean useCanonicalHostName = false;
/**
* Map of server;db;table - sorter partition key order (country;dt;state) - sorted partition
* value (us;20120101;CA) - Collection of waiting actions (actionID and original hcat uri as
* string).
*/
private ConcurrentMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>> missingDeps;
/**
* Map of actionIDs and collection of available URIs
*/
private ConcurrentMap<String, Collection<String>> availableDeps;
/**
* Map of actionIDs and partitions for reverse-lookup in purging
*/
private ConcurrentMap<String, ConcurrentMap<String, Collection<String>>> actionPartitionMap;
@Override
public void init(Configuration conf) {
missingDeps = new ConcurrentHashMap<String, ConcurrentMap<String, Map<String, Collection<WaitingAction>>>>();
availableDeps = new ConcurrentHashMap<String, Collection<String>>();
actionPartitionMap = new ConcurrentHashMap<String, ConcurrentMap<String, Collection<String>>>();
useCanonicalHostName = ConfigurationService.getBoolean(USE_CANONICAL_HOSTNAME);
}
@Override
public void addMissingDependency(HCatURI hcatURI, String actionID) {
String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
// Partition keys seperated by ;. For eg: date;country;state
String partKey = sortedPKV.getPartKeys();
// Partition values seperated by ;. For eg: 20120101;US;CA
String partVal = sortedPKV.getPartVals();
ConcurrentMap<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
partKeyPatterns = new ConcurrentHashMap<String, Map<String, Collection<WaitingAction>>>();
ConcurrentMap<String, Map<String, Collection<WaitingAction>>> existingMap = missingDeps.putIfAbsent(
tableKey, partKeyPatterns);
if (existingMap != null) {
partKeyPatterns = existingMap;
}
}
ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
if (partitionMap == null) {
partitionMap = new ConcurrentHashMap<String, Collection<String>>();
ConcurrentMap<String, Collection<String>> existingPartMap = actionPartitionMap.putIfAbsent(actionID,
partitionMap);
if (existingPartMap != null) {
partitionMap = existingPartMap;
}
}
synchronized (partitionMap) {
Collection<String> partKeys = partitionMap.get(tableKey);
if (partKeys == null) {
partKeys = new ArrayList<String>();
}
partKeys.add(partKey);
partitionMap.put(tableKey, partKeys);
}
synchronized (partKeyPatterns) {
missingDeps.put(tableKey, partKeyPatterns); // To handle race condition with removal of partKeyPatterns
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
if (partValues == null) {
partValues = new HashMap<String, Collection<WaitingAction>>();
partKeyPatterns.put(partKey, partValues);
}
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions == null) {
waitingActions = new HashSet<WaitingAction>();
partValues.put(partVal, waitingActions);
}
waitingActions.add(new WaitingAction(actionID, hcatURI.toURIString()));
}
}
@Override
public boolean removeMissingDependency(HCatURI hcatURI, String actionID) {
String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKey = sortedPKV.getPartKeys();
String partVal = sortedPKV.getPartVals();
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
LOG.warn("Remove missing dependency - Missing table entry - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
ConcurrentMap<String, Collection<String>> partitionMap = actionPartitionMap.get(actionID);
if (partitionMap != null) {
synchronized (partitionMap) {
Collection<String> partKeys = partitionMap.get(tableKey);
if (partKeys != null) {
partKeys.remove(partKey);
}
if (partKeys != null && partKeys.size() == 0) {
partitionMap.remove(tableKey);
}
if (partitionMap.size() == 0) {
actionPartitionMap.remove(actionID);
}
}
}
synchronized(partKeyPatterns) {
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
if (partValues == null) {
LOG.warn("Remove missing dependency - Missing partition pattern - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions == null) {
LOG.warn("Remove missing dependency - Missing partition value - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
return false;
}
boolean removed = waitingActions.remove(new WaitingAction(actionID, hcatURI.toURIString()));
if (!removed) {
LOG.warn("Remove missing dependency - Missing action ID - uri={0}, actionID={1}",
hcatURI.toURIString(), actionID);
}
if (waitingActions.isEmpty()) {
partValues.remove(partVal);
if (partValues.isEmpty()) {
partKeyPatterns.remove(partKey);
}
if (partKeyPatterns.isEmpty()) {
missingDeps.remove(tableKey);
// Close JMS session. Stop listening on topic
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
hcatService.unregisterFromNotification(hcatURI);
}
}
return removed;
}
}
@Override
public Collection<String> getWaitingActions(HCatURI hcatURI) {
String tableKey = canonicalizeHostname(hcatURI.getServer()) + DELIMITER + hcatURI.getDb() + DELIMITER
+ hcatURI.getTable();
SortedPKV sortedPKV = new SortedPKV(hcatURI.getPartitionMap());
String partKey = sortedPKV.getPartKeys();
String partVal = sortedPKV.getPartVals();
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
return null;
}
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
if (partValues == null) {
return null;
}
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions == null) {
return null;
}
Collection<String> actionIDs = new ArrayList<String>();
URI uri = hcatURI.getURI();
String uriString = null;
try {
uriString = new URI(uri.getScheme(), canonicalizeHostname(uri.getAuthority()), uri.getPath(),
uri.getQuery(), uri.getFragment()).toString();
}
catch (URISyntaxException e) {
uriString = hcatURI.toURIString();
}
for (WaitingAction action : waitingActions) {
if (action.getDependencyURI().equals(uriString)) {
actionIDs.add(action.getActionID());
}
}
return actionIDs;
}
@Override
public Collection<String> markDependencyAvailable(String server, String db, String table,
Map<String, String> partitions) {
String tableKey = canonicalizeHostname(server) + DELIMITER + db + DELIMITER + table;
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns == null) {
LOG.warn("Got partition available notification for " + tableKey
+ ". Unexpected and should not be listening to topic. Unregistering topic");
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
hcatService.unregisterFromNotification(server, db, table);
return null;
}
Collection<String> actionsWithAvailDep = new HashSet<String>();
List<String> partKeysToRemove = new ArrayList<String>();
StringBuilder partValSB = new StringBuilder();
synchronized (partKeyPatterns) {
// If partition patterns are date, date;country and date;country;state,
// construct the partition values for each pattern from the available partitions map and
// for the matching value in the dependency map, get the waiting actions.
for (Entry<String, Map<String, Collection<WaitingAction>>> entry : partKeyPatterns.entrySet()) {
String[] partKeys = entry.getKey().split(DELIMITER);
partValSB.setLength(0);
for (String key : partKeys) {
partValSB.append(partitions.get(key)).append(DELIMITER);
}
partValSB.setLength(partValSB.length() - 1);
Map<String, Collection<WaitingAction>> partValues = entry.getValue();
String partVal = partValSB.toString();
Collection<WaitingAction> wActions = entry.getValue().get(partVal);
if (wActions == null)
continue;
for (WaitingAction wAction : wActions) {
String actionID = wAction.getActionID();
actionsWithAvailDep.add(actionID);
Collection<String> depURIs = availableDeps.get(actionID);
if (depURIs == null) {
depURIs = new ArrayList<String>();
Collection<String> existing = availableDeps.putIfAbsent(actionID, depURIs);
if (existing != null) {
depURIs = existing;
}
}
synchronized (depURIs) {
depURIs.add(wAction.getDependencyURI());
availableDeps.put(actionID, depURIs);
}
}
partValues.remove(partVal);
if (partValues.isEmpty()) {
partKeysToRemove.add(entry.getKey());
}
}
for (String partKey : partKeysToRemove) {
partKeyPatterns.remove(partKey);
}
if (partKeyPatterns.isEmpty()) {
missingDeps.remove(tableKey);
// Close JMS session. Stop listening on topic
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
hcatService.unregisterFromNotification(server, db, table);
}
}
return actionsWithAvailDep;
}
@Override
public Collection<String> getAvailableDependencyURIs(String actionID) {
Collection<String> available = availableDeps.get(actionID);
if (available != null) {
// Return a copy
available = new ArrayList<String>(available);
}
return available;
}
@Override
public boolean removeAvailableDependencyURIs(String actionID, Collection<String> dependencyURIs) {
if (!availableDeps.containsKey(actionID)) {
return false;
}
else {
Collection<String> availList = availableDeps.get(actionID);
if (!availList.removeAll(dependencyURIs)) {
return false;
}
synchronized (availList) {
if (availList.isEmpty()) {
availableDeps.remove(actionID);
}
}
}
return true;
}
@Override
public void destroy() {
missingDeps.clear();
availableDeps.clear();
}
private static class SortedPKV {
private StringBuilder partKeys;
private StringBuilder partVals;
public SortedPKV(Map<String, String> partitions) {
this.partKeys = new StringBuilder();
this.partVals = new StringBuilder();
ArrayList<String> keys = new ArrayList<String>(partitions.keySet());
Collections.sort(keys);
for (String key : keys) {
this.partKeys.append(key).append(DELIMITER);
this.partVals.append(partitions.get(key)).append(DELIMITER);
}
this.partKeys.setLength(partKeys.length() - 1);
this.partVals.setLength(partVals.length() - 1);
}
public String getPartKeys() {
return partKeys.toString();
}
public String getPartVals() {
return partVals.toString();
}
}
private HCatURI removePartitions(String coordActionId, Collection<String> partKeys,
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns) {
HCatURI hcatUri = null;
for (String partKey : partKeys) {
Map<String, Collection<WaitingAction>> partValues = partKeyPatterns.get(partKey);
Iterator<String> partValItr = partValues.keySet().iterator();
while (partValItr.hasNext()) {
String partVal = partValItr.next();
Collection<WaitingAction> waitingActions = partValues.get(partVal);
if (waitingActions != null) {
Iterator<WaitingAction> waitItr = waitingActions.iterator();
while (waitItr.hasNext()) {
WaitingAction waction = waitItr.next();
if (coordActionId.contains(waction.getActionID())) {
waitItr.remove();
if (hcatUri == null) {
try {
hcatUri = new HCatURI(waction.getDependencyURI());
}
catch (URISyntaxException e) {
continue;
}
}
}
}
}
// delete partition value with no waiting actions
if (waitingActions.size() == 0) {
partValItr.remove();
}
}
if (partValues.size() == 0) {
partKeyPatterns.remove(partKey);
}
}
return hcatUri;
}
@Override
public void removeNonWaitingCoordActions(Set<String> coordActions) {
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
for (String coordActionId : coordActions) {
LOG.info("Removing non waiting coord action {0} from partition dependency map", coordActionId);
synchronized (actionPartitionMap) {
Map<String, Collection<String>> partitionMap = actionPartitionMap.get(coordActionId);
if (partitionMap != null) {
Iterator<String> tableItr = partitionMap.keySet().iterator();
while (tableItr.hasNext()) {
String tableKey = tableItr.next();
HCatURI hcatUri = null;
Map<String, Map<String, Collection<WaitingAction>>> partKeyPatterns = missingDeps.get(tableKey);
if (partKeyPatterns != null) {
synchronized (partKeyPatterns) {
Collection<String> partKeys = partitionMap.get(tableKey);
if (partKeys != null) {
hcatUri = removePartitions(coordActionId, partKeys, partKeyPatterns);
}
}
if (partKeyPatterns.size() == 0) {
tableItr.remove();
if (hcatUri != null) {
// Close JMS session. Stop listening on topic
hcatService.unregisterFromNotification(hcatUri);
}
}
}
}
}
actionPartitionMap.remove(coordActionId);
}
}
}
@Override
public void removeCoordActionWithDependenciesAvailable(String coordAction) {
actionPartitionMap.remove(coordAction);
}
public String canonicalizeHostname(String name) {
return canonicalizeHostname(name, useCanonicalHostName);
}
public static String canonicalizeHostname(String name, boolean useCanonicalHostName) {
if (useCanonicalHostName) {
String hostname = name;
String port = null;
if (name.contains(":")) {
hostname = name.split(":")[0];
port = name.split(":")[1];
}
try {
InetAddress address = InetAddress.getByName(hostname);
String canonicalHostName = address.getCanonicalHostName();
if (null != port) {
return canonicalHostName + ":" + port;
}
return canonicalHostName;
}
catch (IOException ex) {
LOG.error(ex);
return name;
}
}
else {
return name;
}
}
}