blob: 5697eb6c60712cd544539cc1ae44f70d6dbfe05f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.feed;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.Tag;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.ClusterType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.expression.ExpressionHelper;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.coordinator.WORKFLOW;
import org.apache.falcon.oozie.coordinator.ACTION;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Properties;
/**
* Builds oozie coordinator for feed replication, one per source-target cluster combination.
*/
public class FeedReplicationCoordinatorBuilder extends OozieCoordinatorBuilder<Feed> {
private static final String REPLICATION_COORD_TEMPLATE = "/coordinator/replication-coordinator.xml";
private static final String IMPORT_HQL = "/action/feed/falcon-table-import.hql";
private static final String EXPORT_HQL = "/action/feed/falcon-table-export.hql";
private static final int THIRTY_MINUTES = 30 * 60 * 1000;
private static final String PARALLEL = "parallel";
private static final String TIMEOUT = "timeout";
private static final String MR_MAX_MAPS = "maxMaps";
private static final String MR_MAP_BANDWIDTH = "mapBandwidthKB";
public FeedReplicationCoordinatorBuilder(Feed entity) {
super(entity, LifeCycle.REPLICATION);
}
@Override
public List<Properties> buildCoords(Cluster cluster, Path buildPath) throws FalconException {
org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(entity, cluster.getName());
if (feedCluster.getType() == ClusterType.TARGET) {
List<Properties> props = new ArrayList<Properties>();
for (org.apache.falcon.entity.v0.feed.Cluster srcFeedCluster : entity.getClusters().getClusters()) {
if (srcFeedCluster.getType() == ClusterType.SOURCE) {
Cluster srcCluster = ConfigurationStore.get().get(EntityType.CLUSTER, srcFeedCluster.getName());
// workflow is serialized to a specific dir
Path coordPath = new Path(buildPath, Tag.REPLICATION.name() + "/" + srcCluster.getName());
props.add(doBuild(srcCluster, cluster, coordPath));
}
}
return props;
}
return null;
}
@Override
protected WorkflowExecutionContext.EntityOperations getOperation() {
return WorkflowExecutionContext.EntityOperations.REPLICATE;
}
private Properties doBuild(Cluster srcCluster, Cluster trgCluster, Path buildPath) throws FalconException {
// Different workflow for each source since hive credentials vary for each cluster
OozieOrchestrationWorkflowBuilder builder = OozieOrchestrationWorkflowBuilder.get(entity, trgCluster,
Tag.REPLICATION);
Properties wfProps = builder.build(trgCluster, buildPath);
long replicationDelayInMillis = getReplicationDelayInMillis(srcCluster);
Date sourceStartDate = getStartDate(srcCluster, replicationDelayInMillis);
Date sourceEndDate = getEndDate(srcCluster);
Date targetStartDate = getStartDate(trgCluster, replicationDelayInMillis);
Date targetEndDate = getEndDate(trgCluster);
if (noOverlapExists(sourceStartDate, sourceEndDate,
targetStartDate, targetEndDate)) {
LOG.warn("Not creating replication coordinator, as the source cluster: {} and target cluster: {} do "
+ "not have overlapping dates", srcCluster.getName(), trgCluster.getName());
return null;
}
COORDINATORAPP coord = unmarshal(REPLICATION_COORD_TEMPLATE);
String coordName = EntityUtil.getWorkflowName(Tag.REPLICATION, Arrays.asList(srcCluster.getName()),
entity).toString();
String start = sourceStartDate.after(targetStartDate)
? SchemaHelper.formatDateUTC(sourceStartDate) : SchemaHelper.formatDateUTC(targetStartDate);
String end = sourceEndDate.before(targetEndDate)
? SchemaHelper.formatDateUTC(sourceEndDate) : SchemaHelper.formatDateUTC(targetEndDate);
initializeCoordAttributes(coord, coordName, start, end, replicationDelayInMillis);
setCoordControls(coord);
final Storage sourceStorage = FeedHelper.createReadOnlyStorage(srcCluster, entity);
initializeInputDataSet(srcCluster, coord, sourceStorage);
final Storage targetStorage = FeedHelper.createStorage(trgCluster, entity);
initializeOutputDataSet(trgCluster, coord, targetStorage);
ACTION replicationWorkflowAction = getReplicationWorkflowAction(
srcCluster, trgCluster, buildPath, coordName, sourceStorage, targetStorage);
coord.setAction(replicationWorkflowAction);
Path marshalPath = marshal(trgCluster, coord, buildPath);
wfProps.putAll(getProperties(marshalPath, coordName));
return wfProps;
}
private ACTION getReplicationWorkflowAction(Cluster srcCluster, Cluster trgCluster, Path buildPath,
String wfName, Storage sourceStorage, Storage targetStorage) throws FalconException {
ACTION action = new ACTION();
WORKFLOW workflow = new WORKFLOW();
workflow.setAppPath(getStoragePath(buildPath));
Properties props = createCoordDefaultConfiguration(trgCluster, wfName);
// Override CLUSTER_NAME property to include both source and target cluster
String clusterProperty = trgCluster.getName()
+ WorkflowExecutionContext.CLUSTER_NAME_SEPARATOR + srcCluster.getName();
props.put(WorkflowExecutionArgs.CLUSTER_NAME.getName(), clusterProperty);
props.put("srcClusterName", srcCluster.getName());
props.put("srcClusterColo", srcCluster.getColo());
if (props.get(MR_MAX_MAPS) == null) { // set default if user has not overridden
props.put(MR_MAX_MAPS, getDefaultMaxMaps());
}
if (props.get(MR_MAP_BANDWIDTH) == null) { // set default if user has not overridden
props.put(MR_MAP_BANDWIDTH, getDefaultMapBandwidth());
}
// the storage type is uniform across source and target feeds for replication
props.put("falconFeedStorageType", sourceStorage.getType().name());
String instancePaths = "";
if (sourceStorage.getType() == Storage.TYPE.FILESYSTEM) {
String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster);
instancePaths = pathsWithPartitions;
propagateFileSystemCopyProperties(pathsWithPartitions, props);
} else if (sourceStorage.getType() == Storage.TYPE.TABLE) {
instancePaths = "${coord:dataIn('input')}";
final CatalogStorage sourceTableStorage = (CatalogStorage) sourceStorage;
propagateTableStorageProperties(srcCluster, sourceTableStorage, props, "falconSource");
final CatalogStorage targetTableStorage = (CatalogStorage) targetStorage;
propagateTableStorageProperties(trgCluster, targetTableStorage, props, "falconTarget");
propagateTableCopyProperties(srcCluster, sourceTableStorage, trgCluster, targetTableStorage, props);
setupHiveConfiguration(srcCluster, trgCluster, buildPath);
}
propagateLateDataProperties(instancePaths, sourceStorage.getType().name(), props);
props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
workflow.setConfiguration(getConfig(props));
action.setWorkflow(workflow);
return action;
}
private String getDefaultMaxMaps() {
return RuntimeProperties.get().getProperty("falcon.replication.workflow.maxmaps", "5");
}
private String getDefaultMapBandwidth() {
return RuntimeProperties.get().getProperty("falcon.replication.workflow.mapbandwidthKB", "102400");
}
private String getPathsWithPartitions(Cluster srcCluster, Cluster trgCluster) throws FalconException {
String srcPart = FeedHelper.normalizePartitionExpression(
FeedHelper.getCluster(entity, srcCluster.getName()).getPartition());
srcPart = FeedHelper.evaluateClusterExp(srcCluster, srcPart);
String targetPart = FeedHelper.normalizePartitionExpression(
FeedHelper.getCluster(entity, trgCluster.getName()).getPartition());
targetPart = FeedHelper.evaluateClusterExp(trgCluster, targetPart);
StringBuilder pathsWithPartitions = new StringBuilder();
pathsWithPartitions.append("${coord:dataIn('input')}/")
.append(FeedHelper.normalizePartitionExpression(srcPart, targetPart));
String parts = pathsWithPartitions.toString().replaceAll("//+", "/");
parts = StringUtils.stripEnd(parts, "/");
return parts;
}
private void propagateFileSystemCopyProperties(String paths, Properties props) throws FalconException {
props.put("sourceRelativePaths", paths);
props.put("distcpSourcePaths", "${coord:dataIn('input')}");
props.put("distcpTargetPaths", "${coord:dataOut('output')}");
}
private void propagateTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
Properties props, String prefix) {
props.put(prefix + "NameNode", ClusterHelper.getStorageUrl(cluster));
props.put(prefix + "JobTracker", ClusterHelper.getMREndPoint(cluster));
props.put(prefix + "HcatNode", tableStorage.getCatalogUrl());
props.put(prefix + "Database", tableStorage.getDatabase());
props.put(prefix + "Table", tableStorage.getTable());
props.put(prefix + "Partition", "${coord:dataInPartitions('input', 'hive-export')}");
}
private void propagateTableCopyProperties(Cluster srcCluster, CatalogStorage sourceStorage,
Cluster trgCluster, CatalogStorage targetStorage, Properties props) {
// create staging dirs for export at source & set it as distcpSourcePaths
String sourceStagingPath =
FeedHelper.getStagingPath(srcCluster, entity, sourceStorage, Tag.REPLICATION,
NOMINAL_TIME_EL + "/" + trgCluster.getName());
props.put("distcpSourcePaths", sourceStagingPath);
// create staging dirs for import at target & set it as distcpTargetPaths
String targetStagingPath =
FeedHelper.getStagingPath(trgCluster, entity, targetStorage, Tag.REPLICATION,
NOMINAL_TIME_EL + "/" + trgCluster.getName());
props.put("distcpTargetPaths", targetStagingPath);
props.put("sourceRelativePaths", IGNORE); // this will bot be used for Table storage.
}
private void propagateLateDataProperties(String instancePaths,
String falconFeedStorageType, Properties props) {
// todo these pairs are the same but used in different context
// late data handler - should-record action
props.put(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), entity.getName());
props.put(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), instancePaths);
props.put(WorkflowExecutionArgs.INPUT_NAMES.getName(), entity.getName());
// storage type for each corresponding feed - in this case only one feed is involved
// needed to compute usage based on storage type in LateDataHandler
props.put(WorkflowExecutionArgs.INPUT_STORAGE_TYPES.getName(), falconFeedStorageType);
// falcon post processing
props.put(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), entity.getName());
props.put(WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), "${coord:dataOut('output')}");
}
private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
Path buildPath) throws FalconException {
Configuration conf = ClusterHelper.getConfiguration(trgCluster);
FileSystem fs = HadoopClientFactory.get().createFileSystem(conf);
try {
// copy import export scripts to stagingDir
Path scriptPath = new Path(buildPath, "scripts");
copyHiveScript(fs, scriptPath, IMPORT_HQL);
copyHiveScript(fs, scriptPath, EXPORT_HQL);
// create hive conf to stagingDir
Path confPath = new Path(buildPath + "/conf");
persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
} catch (IOException e) {
throw new FalconException("Unable to create hive conf files", e);
}
}
private void copyHiveScript(FileSystem fs, Path scriptPath, String resource) throws IOException {
OutputStream out = null;
InputStream in = null;
try {
out = fs.create(new Path(scriptPath, new Path(resource).getName()));
in = FeedReplicationCoordinatorBuilder.class.getResourceAsStream(resource);
IOUtils.copy(in, out);
} finally {
IOUtils.closeQuietly(in);
IOUtils.closeQuietly(out);
}
}
protected void persistHiveConfiguration(FileSystem fs, Path confPath,
Cluster cluster, String prefix) throws IOException {
Configuration hiveConf = getHiveCredentialsAsConf(cluster);
OutputStream out = null;
try {
out = fs.create(new Path(confPath, prefix + "hive-site.xml"));
hiveConf.writeXml(out);
} finally {
IOUtils.closeQuietly(out);
}
}
private void initializeCoordAttributes(COORDINATORAPP coord, String coordName, String start, String end,
long delayInMillis) {
coord.setName(coordName);
coord.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
if (delayInMillis > 0) {
long delayInMins = -1 * delayInMillis / (1000 * 60);
String elExp = "${now(0," + delayInMins + ")}";
coord.getInputEvents().getDataIn().get(0).getInstance().set(0, elExp);
coord.getOutputEvents().getDataOut().get(0).setInstance(elExp);
}
coord.setStart(start);
coord.setEnd(end);
coord.setTimezone(entity.getTimezone().getID());
}
private void setCoordControls(COORDINATORAPP coord) throws FalconException {
long frequencyInMillis = ExpressionHelper.get().evaluate(entity.getFrequency().toString(), Long.class);
long timeoutInMillis = frequencyInMillis * 6;
if (timeoutInMillis < THIRTY_MINUTES) {
timeoutInMillis = THIRTY_MINUTES;
}
Properties props = getEntityProperties(entity);
String timeout = props.getProperty(TIMEOUT);
if (timeout!=null) {
try{
timeoutInMillis= ExpressionHelper.get().evaluate(timeout, Long.class);
} catch (Exception ignore) {
LOG.error("Unable to evaluate timeout:", ignore);
}
}
coord.getControls().setTimeout(String.valueOf(timeoutInMillis / (1000 * 60)));
coord.getControls().setThrottle(String.valueOf(timeoutInMillis / frequencyInMillis * 2));
String parallelProp = props.getProperty(PARALLEL);
int parallel = 1;
if (parallelProp != null) {
try {
parallel = Integer.parseInt(parallelProp);
} catch (NumberFormatException ignore) {
LOG.error("Unable to parse parallel:", ignore);
}
}
coord.getControls().setConcurrency(String.valueOf(parallel));
}
private void initializeInputDataSet(Cluster cluster, COORDINATORAPP coord, Storage storage) throws FalconException {
SYNCDATASET inputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(0);
String uriTemplate = storage.getUriTemplate(LocationType.DATA);
if (storage.getType() == Storage.TYPE.TABLE) {
uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
}
inputDataset.setUriTemplate(uriTemplate);
setDatasetValues(inputDataset, cluster);
if (entity.getAvailabilityFlag() == null) {
inputDataset.setDoneFlag("");
} else {
inputDataset.setDoneFlag(entity.getAvailabilityFlag());
}
}
private void initializeOutputDataSet(Cluster cluster, COORDINATORAPP coord,
Storage storage) throws FalconException {
SYNCDATASET outputDataset = (SYNCDATASET)coord.getDatasets().getDatasetOrAsyncDataset().get(1);
String uriTemplate = storage.getUriTemplate(LocationType.DATA);
if (storage.getType() == Storage.TYPE.TABLE) {
uriTemplate = uriTemplate.replace("thrift", "hcat"); // Oozie requires this!!!
}
outputDataset.setUriTemplate(uriTemplate);
setDatasetValues(outputDataset, cluster);
}
private void setDatasetValues(SYNCDATASET dataset, Cluster cluster) {
dataset.setInitialInstance(SchemaHelper.formatDateUTC(
FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart()));
dataset.setTimezone(entity.getTimezone().getID());
dataset.setFrequency("${coord:" + entity.getFrequency().toString() + "}");
}
private long getReplicationDelayInMillis(Cluster srcCluster) throws FalconException {
Frequency replicationDelay = FeedHelper.getCluster(entity, srcCluster.getName()).getDelay();
long delayInMillis=0;
if (replicationDelay != null) {
delayInMillis = ExpressionHelper.get().evaluate(
replicationDelay.toString(), Long.class);
}
return delayInMillis;
}
private Date getStartDate(Cluster cluster, long replicationDelayInMillis) {
Date startDate = FeedHelper.getCluster(entity, cluster.getName()).getValidity().getStart();
return replicationDelayInMillis == 0 ? startDate : new Date(startDate.getTime() + replicationDelayInMillis);
}
private Date getEndDate(Cluster cluster) {
return FeedHelper.getCluster(entity, cluster.getName()).getValidity().getEnd();
}
private boolean noOverlapExists(Date sourceStartDate, Date sourceEndDate,
Date targetStartDate, Date targetEndDate) {
return sourceStartDate.after(targetEndDate) || targetStartDate.after(sourceEndDate);
}
}