blob: 20407482d219110cacd381b8c66a841ac9027f31 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.feed;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieCoordinatorBuilder;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.OozieOrchestrationWorkflowBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.COORDINATOR;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.process.AbstractTestBase;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.oozie.workflow.JAVA;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.service.LifecyclePolicyMap;
import org.apache.falcon.util.DateUtil;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Tests for Oozie workflow definition for feed replication & retention.
*/
public class OozieFeedWorkflowBuilderTest extends AbstractTestBase {
private EmbeddedCluster srcMiniDFS;
private EmbeddedCluster trgMiniDFS;
private final ConfigurationStore store = ConfigurationStore.get();
private Cluster srcCluster;
private Cluster trgCluster;
private Cluster alphaTrgCluster;
private Cluster betaTrgCluster;
private Feed feed;
private Feed tableFeed;
private Feed fsReplFeed;
private Feed lifecycleRetentionFeed;
private Feed lifecycleLocalRetentionFeed;
private Feed fsReplFeedCounter;
private static final String SRC_CLUSTER_PATH = "/feed/src-cluster.xml";
private static final String TRG_CLUSTER_PATH = "/feed/trg-cluster.xml";
private static final String FEED = "/feed/feed.xml";
private static final String TABLE_FEED = "/feed/table-replication-feed.xml";
private static final String FS_REPLICATION_FEED = "/feed/fs-replication-feed.xml";
private static final String FS_RETENTION_LIFECYCLE_FEED = "/feed/fs-retention-lifecycle-feed.xml";
private static final String FS_LOCAL_RETENTION_LIFECYCLE_FEED = "/feed/fs-local-retention-lifecycle-feed.xml";
private static final String FS_REPLICATION_FEED_COUNTER = "/feed/fs-replication-feed-counters.xml";
@BeforeClass
public void setUpDFS() throws Exception {
CurrentUser.authenticate(System.getProperty("user.name"));
srcMiniDFS = EmbeddedCluster.newCluster("cluster1");
String srcHdfsUrl = srcMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
trgMiniDFS = EmbeddedCluster.newCluster("cluster2");
String trgHdfsUrl = trgMiniDFS.getConf().get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
LifecyclePolicyMap.get().init();
cleanupStore();
org.apache.falcon.entity.v0.cluster.Property property =
new org.apache.falcon.entity.v0.cluster.Property();
property.setName(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
property.setValue("hive/_HOST");
srcCluster = (Cluster) storeEntity(EntityType.CLUSTER, SRC_CLUSTER_PATH, srcHdfsUrl);
srcCluster.getProperties().getProperties().add(property);
trgCluster = (Cluster) storeEntity(EntityType.CLUSTER, TRG_CLUSTER_PATH, trgHdfsUrl);
trgCluster.getProperties().getProperties().add(property);
alphaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/feed/trg-cluster-alpha.xml", trgHdfsUrl);
betaTrgCluster = (Cluster) storeEntity(EntityType.CLUSTER, "/feed/trg-cluster-beta.xml", trgHdfsUrl);
feed = (Feed) storeEntity(EntityType.FEED, FEED);
fsReplFeed = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED);
fsReplFeedCounter = (Feed) storeEntity(EntityType.FEED, FS_REPLICATION_FEED_COUNTER);
tableFeed = (Feed) storeEntity(EntityType.FEED, TABLE_FEED);
lifecycleRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_RETENTION_LIFECYCLE_FEED);
lifecycleLocalRetentionFeed = (Feed) storeEntity(EntityType.FEED, FS_LOCAL_RETENTION_LIFECYCLE_FEED);
}
private Entity storeEntity(EntityType type, String resource) throws Exception {
return storeEntity(type, null, resource, null);
}
private Entity storeEntity(EntityType type, String resource, String writeUrl) throws Exception {
return storeEntity(type, null, resource, writeUrl);
}
protected void cleanupStore() throws FalconException {
for (EntityType type : EntityType.values()) {
Collection<String> entities = store.getEntities(type);
for (String entity : entities) {
store.remove(type, entity);
}
}
}
@AfterClass
public void stopDFS() {
srcMiniDFS.shutdown();
trgMiniDFS.shutdown();
}
@DataProvider(name = "keepInstancesPostValidity")
private Object[][] keepInstancesPostValidity() {
return new Object[][] {
{"false", "2099-01-01T02:00Z"},
{"true", "2099-01-01T00:00Z"},
};
}
@Test(dataProvider = "keepInstancesPostValidity")
public void testRetentionWithLifecycle(String keepInstancesPostValidity, String endTime) throws Exception {
RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity",
keepInstancesPostValidity);
OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
Path bundlePath = new Path("/projects/falcon/");
builder.build(trgCluster, bundlePath);
BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
List<COORDINATOR> coords = bundle.getCoordinator();
COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
assertLibExtensions(coord, "retention");
HashMap<String, String> props = getCoordProperties(coord);
Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION");
Assert.assertEquals(coord.getFrequency(), "${coord:hours(17)}");
Assert.assertEquals(coord.getEnd(), endTime);
Assert.assertEquals(coord.getTimezone(), "UTC");
HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
Assert.assertEquals(wfProps.get("feedNames"), lifecycleRetentionFeed.getName());
Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
Assert.assertEquals(wfProps.get("queueName"), "retention");
Assert.assertEquals(wfProps.get("limit"), "hours(2)");
Assert.assertEquals(wfProps.get("jobPriority"), "LOW");
}
@Test
public void testLocalOnlyRetentionLifecycle() throws Exception {
OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleLocalRetentionFeed);
Path bundlePath = new Path("/projects/falcon/");
builder.build(trgCluster, bundlePath);
BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
List<COORDINATOR> coords = bundle.getCoordinator();
COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
assertLibExtensions(coord, "retention");
HashMap<String, String> props = getCoordProperties(coord);
Assert.assertEquals(props.get("ENTITY_PATH"), bundlePath.toString() + "/RETENTION");
Assert.assertEquals(coord.getFrequency(), "${coord:hours(12)}");
Assert.assertEquals(coord.getTimezone(), "UTC");
HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
Assert.assertEquals(wfProps.get("feedNames"), lifecycleLocalRetentionFeed.getName());
Assert.assertTrue(StringUtils.equals(wfProps.get("entityType"), EntityType.FEED.name()));
Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
Assert.assertEquals(wfProps.get("queueName"), "local");
Assert.assertEquals(wfProps.get("limit"), "hours(4)");
Assert.assertEquals(wfProps.get("jobPriority"), "HIGH");
}
@Test
public void testRetentionFrequency() throws Exception {
feed.setFrequency(new Frequency("minutes(36000)"));
buildCoordAndValidateFrequency("${coord:days(1)}");
feed.setFrequency(new Frequency("hours(2)"));
buildCoordAndValidateFrequency("${coord:hours(6)}");
feed.setFrequency(new Frequency("minutes(50)"));
buildCoordAndValidateFrequency("${coord:hours(6)}");
feed.setFrequency(new Frequency("days(1)"));
buildCoordAndValidateFrequency("${coord:days(1)}");
}
private void buildCoordAndValidateFrequency(final String expectedFrequency) throws Exception {
// retention on src cluster
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
List<Properties> srcCoords = builder.buildCoords(
srcCluster, new Path("/projects/falcon/"));
COORDINATORAPP srcCoord = getCoordinator(srcMiniDFS, srcCoords.get(0).getProperty(OozieEntityBuilder
.ENTITY_PATH));
// Assert src coord frequency
Assert.assertEquals(srcCoord.getFrequency(), expectedFrequency);
// retention on target cluster
OozieEntityBuilder entityBuilder = OozieEntityBuilder.get(feed);
Path bundlePath = new Path("/projects/falcon/");
entityBuilder.build(trgCluster, bundlePath);
BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
List<COORDINATOR> coords = bundle.getCoordinator();
COORDINATORAPP tgtCoord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
// Assert target coord frequency
Assert.assertEquals(tgtCoord.getFrequency(), expectedFrequency);
}
@Test
public void testReplicationCoordsForFSStorage() throws Exception {
OozieEntityBuilder builder = OozieEntityBuilder.get(feed);
Path bundlePath = new Path("/projects/falcon/");
builder.build(trgCluster, bundlePath);
BUNDLEAPP bundle = getBundle(trgMiniDFS.getFileSystem(), bundlePath);
List<COORDINATOR> coords = bundle.getCoordinator();
//Assert retention coord
COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getAppPath());
assertLibExtensions(coord, "retention");
//Assert replication coord
coord = getCoordinator(trgMiniDFS, coords.get(1).getAppPath());
Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
Assert.assertEquals(getWorkflowAppPath(), coord.getAction().getWorkflow().getAppPath());
Assert.assertEquals("FALCON_FEED_REPLICATION_" + feed.getName() + "_"
+ srcCluster.getName(), coord.getName());
Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
Assert.assertEquals("2", coord.getControls().getConcurrency());
Assert.assertEquals("120", coord.getControls().getTimeout());
Assert.assertEquals("FIFO", coord.getControls().getExecution());
SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
.getDatasetOrAsyncDataset().get(0);
SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
.getDatasetOrAsyncDataset().get(1);
Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
Assert.assertEquals("input-dataset", inputDataset.getName());
Assert.assertEquals(
ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+ "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
inputDataset.getUriTemplate());
Assert.assertEquals("${coord:minutes(20)}",
outputDataset.getFrequency());
Assert.assertEquals("output-dataset", outputDataset.getName());
Assert.assertEquals(ClusterHelper.getStorageUrl(trgCluster)
+ "/examples/input-data/rawLogs/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}",
outputDataset.getUriTemplate());
String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
Assert.assertEquals("input", inEventName);
Assert.assertEquals("input-dataset", inEventDataset);
Assert.assertEquals("${now(0,-40)}", inEventInstance);
String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
Assert.assertEquals("${now(0,-40)}", outEventInstance);
HashMap<String, String> props = getCoordProperties(coord);
verifyEntityProperties(trgCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
verifyEntityProperties(feed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
verifyBrokerProperties(trgCluster, wfProps);
// verify the replication param that feed replicator depends on
String pathsWithPartitions = getPathsWithPartitions(srcCluster, trgCluster, feed);
Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), feed.getName());
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), feed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInPaths"), pathsWithPartitions);
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.FILESYSTEM.name());
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), feed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
// verify workflow params
Assert.assertEquals(wfProps.get("userWorkflowName"), "replication-policy");
Assert.assertEquals(wfProps.get("userWorkflowVersion"), "0.6");
Assert.assertEquals(wfProps.get("userWorkflowEngine"), "falcon");
// verify default params
Assert.assertEquals(wfProps.get("queueName"), "default");
Assert.assertEquals(wfProps.get("jobPriority"), "NORMAL");
Assert.assertEquals(wfProps.get("maxMaps"), "5");
Assert.assertEquals(wfProps.get("mapBandwidth"), "100");
assertLibExtensions(coord, "replication");
WORKFLOWAPP wf = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
assertWorkflowRetries(wf);
Assert.assertFalse(Storage.TYPE.TABLE == FeedHelper.getStorageType(feed, trgCluster));
}
private void assertLibExtensions(COORDINATORAPP coord, String lifecycle) throws Exception {
assertLibExtensions(trgMiniDFS.getFileSystem(), coord, EntityType.FEED, lifecycle);
}
private COORDINATORAPP getCoordinator(EmbeddedCluster cluster, String appPath) throws Exception {
return getCoordinator(cluster.getFileSystem(),
new Path(StringUtils.removeStart(appPath, "${nameNode}")));
}
private String getWorkflowAppPath() {
return "${nameNode}/projects/falcon/REPLICATION/" + srcCluster.getName();
}
private void assertWorkflowRetries(COORDINATORAPP coord) throws JAXBException, IOException {
assertWorkflowRetries(getWorkflowapp(trgMiniDFS.getFileSystem(), coord));
}
private void assertWorkflowRetries(WORKFLOWAPP wf) throws JAXBException, IOException {
List<Object> actions = wf.getDecisionOrForkOrJoin();
for (Object obj : actions) {
if (!(obj instanceof ACTION)) {
continue;
}
ACTION action = (ACTION) obj;
String actionName = action.getName();
if (OozieOrchestrationWorkflowBuilder.FALCON_ACTIONS.contains(actionName)) {
Assert.assertEquals(action.getRetryMax(), "3");
Assert.assertEquals(action.getRetryInterval(), "1");
}
}
}
@Test
public void testReplicationCoordsForFSStorageWithMultipleTargets() throws Exception {
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeed, Tag.REPLICATION);
List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster,
new Path("/alpha/falcon/"));
final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeed);
assertReplCoord(alphaCoord, fsReplFeed, alphaTrgCluster, pathsWithPartitions);
List<Properties> betaCoords = builder.buildCoords(betaTrgCluster, new Path("/beta/falcon/"));
final COORDINATORAPP betaCoord = getCoordinator(trgMiniDFS,
betaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals(betaCoord.getStart(), "2012-10-01T12:10Z");
Assert.assertEquals(betaCoord.getEnd(), "2012-10-01T12:26Z");
pathsWithPartitions = getPathsWithPartitions(srcCluster, betaTrgCluster, fsReplFeed);
assertReplCoord(betaCoord, fsReplFeed, betaTrgCluster, pathsWithPartitions);
}
@Test
public void testReplicationWithCounters() throws Exception {
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(fsReplFeedCounter, Tag.REPLICATION);
List<Properties> alphaCoords = builder.buildCoords(alphaTrgCluster, new Path("/alpha/falcon"));
final COORDINATORAPP alphaCoord = getCoordinator(trgMiniDFS,
alphaCoords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals(alphaCoord.getStart(), "2012-10-01T12:05Z");
Assert.assertEquals(alphaCoord.getEnd(), "2012-10-01T12:11Z");
String pathsWithPartitions = getPathsWithPartitions(srcCluster, alphaTrgCluster, fsReplFeedCounter);
assertReplCoord(alphaCoord, fsReplFeedCounter, alphaTrgCluster, pathsWithPartitions);
}
private String getPathsWithPartitions(Cluster sourceCluster, Cluster targetCluster,
Feed aFeed) throws FalconException {
String srcPart = FeedHelper.normalizePartitionExpression(
FeedHelper.getCluster(aFeed, sourceCluster.getName()).getPartition());
srcPart = FeedHelper.evaluateClusterExp(sourceCluster, srcPart);
String targetPart = FeedHelper.normalizePartitionExpression(
FeedHelper.getCluster(aFeed, targetCluster.getName()).getPartition());
targetPart = FeedHelper.evaluateClusterExp(targetCluster, targetPart);
String pathsWithPartitions = "${coord:dataIn('input')}/"
+ FeedHelper.normalizePartitionExpression(srcPart, targetPart);
String parts = pathsWithPartitions.replaceAll("//+", "/");
parts = StringUtils.stripEnd(parts, "/");
return parts;
}
private void assertReplCoord(COORDINATORAPP coord, Feed aFeed, Cluster aCluster,
String pathsWithPartitions) throws Exception {
org.apache.falcon.entity.v0.feed.Cluster feedCluster =
FeedHelper.getCluster(aFeed, aCluster.getName());
Date startDate = feedCluster.getValidity().getStart();
Assert.assertEquals(coord.getStart(), SchemaHelper.formatDateUTC(startDate));
Date endDate = feedCluster.getValidity().getEnd();
Assert.assertEquals(coord.getEnd(), SchemaHelper.formatDateUTC(endDate));
WORKFLOWAPP workflow = getWorkflowapp(trgMiniDFS.getFileSystem(), coord);
assertWorkflowDefinition(aFeed, workflow, false);
ACTION replicationActionNode = getAction(workflow, "replication");
JAVA replication = replicationActionNode.getJava();
List<String> args = replication.getArg();
if (args.contains("-counterLogDir")) {
Assert.assertEquals(args.size(), 17);
} else {
Assert.assertEquals(args.size(), 15);
}
HashMap<String, String> props = getCoordProperties(coord);
Assert.assertEquals(props.get("sourceRelativePaths"), pathsWithPartitions);
Assert.assertEquals(props.get("sourceRelativePaths"), "${coord:dataIn('input')}/" + srcCluster.getColo());
Assert.assertEquals(props.get("distcpSourcePaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("distcpTargetPaths"), "${coord:dataOut('output')}");
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.FILESYSTEM.name());
verifyEntityProperties(aCluster, srcCluster,
WorkflowExecutionContext.EntityOperations.REPLICATE, props);
HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
verifyEntityProperties(aFeed, aCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
verifyBrokerProperties(aCluster, wfProps);
Assert.assertEquals(wfProps.get("maxMaps"), "33");
Assert.assertEquals(wfProps.get("mapBandwidth"), "2");
}
public void assertWorkflowDefinition(Feed aFeed, WORKFLOWAPP workflow, boolean isTable) {
Assert.assertEquals(EntityUtil.getWorkflowName(Tag.REPLICATION, aFeed).toString(),
workflow.getName());
boolean preProcess = RuntimeProperties.get().getProperty("feed.late.allowed", "true").equalsIgnoreCase(
"true");
if (preProcess) {
assertAction(workflow, "pre-processing", true);
}
assertAction(workflow, "replication", false);
assertAction(workflow, "succeeded-post-processing", true);
assertAction(workflow, "failed-post-processing", true);
if (isTable) {
assertAction(workflow, "table-import", false);
assertAction(workflow, "table-export", false);
}
}
@DataProvider(name = "secureOptions")
private Object[][] createOptions() {
return new Object[][] {
{"simple"},
{"kerberos"},
};
}
@Test (dataProvider = "secureOptions")
public void testReplicationCoordsForTableStorage(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(tableFeed, Tag.REPLICATION);
List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals("2010-01-01T00:40Z", coord.getStart());
Assert.assertEquals(getWorkflowAppPath(),
coord.getAction().getWorkflow().getAppPath());
Assert.assertEquals("FALCON_FEED_REPLICATION_" + tableFeed.getName() + "_"
+ srcCluster.getName(), coord.getName());
Assert.assertEquals("${coord:minutes(20)}", coord.getFrequency());
SYNCDATASET inputDataset = (SYNCDATASET) coord.getDatasets()
.getDatasetOrAsyncDataset().get(0);
Assert.assertEquals("${coord:minutes(20)}", inputDataset.getFrequency());
Assert.assertEquals("input-dataset", inputDataset.getName());
String sourceRegistry = ClusterHelper.getInterface(srcCluster, Interfacetype.REGISTRY).getEndpoint();
sourceRegistry = sourceRegistry.replace("thrift", "hcat");
Assert.assertEquals(inputDataset.getUriTemplate(),
sourceRegistry + "/source_db/source_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
SYNCDATASET outputDataset = (SYNCDATASET) coord.getDatasets()
.getDatasetOrAsyncDataset().get(1);
Assert.assertEquals(outputDataset.getFrequency(), "${coord:minutes(20)}");
Assert.assertEquals("output-dataset", outputDataset.getName());
String targetRegistry = ClusterHelper.getInterface(trgCluster, Interfacetype.REGISTRY).getEndpoint();
targetRegistry = targetRegistry.replace("thrift", "hcat");
Assert.assertEquals(outputDataset.getUriTemplate(),
targetRegistry + "/target_db/target_clicks_table/ds=${YEAR}${MONTH}${DAY};region=${region}");
String inEventName =coord.getInputEvents().getDataIn().get(0).getName();
String inEventDataset =coord.getInputEvents().getDataIn().get(0).getDataset();
String inEventInstance = coord.getInputEvents().getDataIn().get(0).getInstance().get(0);
Assert.assertEquals("input", inEventName);
Assert.assertEquals("input-dataset", inEventDataset);
Assert.assertEquals("${now(0,-40)}", inEventInstance);
String outEventInstance = coord.getOutputEvents().getDataOut().get(0).getInstance();
Assert.assertEquals("${now(0,-40)}", outEventInstance);
// assert FS staging area
Path wfPath = new Path(coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
final FileSystem fs = trgMiniDFS.getFileSystem();
Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));
HashMap<String, String> props = getCoordProperties(coord);
final CatalogStorage srcStorage = (CatalogStorage) FeedHelper.createStorage(srcCluster,
tableFeed);
final CatalogStorage trgStorage = (CatalogStorage) FeedHelper.createStorage(trgCluster,
tableFeed);
// verify the replication param that feed replicator depends on
Assert.assertEquals(props.get("sourceRelativePaths"), "IGNORE");
Assert.assertTrue(props.containsKey("distcpSourcePaths"));
final String distcpSourcePaths = props.get("distcpSourcePaths");
Assert.assertEquals(distcpSourcePaths,
FeedHelper.getStagingPath(true, srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
"${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName()));
Assert.assertTrue(props.containsKey("falconSourceStagingDir"));
final String falconSourceStagingDir = props.get("falconSourceStagingDir");
Assert.assertEquals(falconSourceStagingDir,
FeedHelper.getStagingPath(false, srcCluster, tableFeed, srcStorage, Tag.REPLICATION,
"${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName()));
String exportPath = falconSourceStagingDir.substring(
ClusterHelper.getStorageUrl(srcCluster).length(), falconSourceStagingDir.length());
String distCPPath = distcpSourcePaths.substring(
ClusterHelper.getReadOnlyStorageUrl(srcCluster).length(), distcpSourcePaths.length());
Assert.assertEquals(exportPath, distCPPath);
Assert.assertTrue(props.containsKey("distcpTargetPaths"));
Assert.assertEquals(props.get("distcpTargetPaths"),
FeedHelper.getStagingPath(false, trgCluster, tableFeed, trgStorage, Tag.REPLICATION,
"${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd-HH-mm')}" + "/" + trgCluster.getName()));
Assert.assertEquals(props.get("falconFeedStorageType"), Storage.TYPE.TABLE.name());
// verify table props
assertTableStorageProperties(srcCluster, srcStorage, props, "falconSource");
assertTableStorageProperties(trgCluster, trgStorage, props, "falconTarget");
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), tableFeed.getName());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), tableFeed.getName());
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), tableFeed.getName());
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
assertReplicationHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
wfPath.toString());
HashMap<String, String> wfProps = getWorkflowProperties(trgMiniDFS.getFileSystem(), coord);
verifyEntityProperties(tableFeed, trgCluster, WorkflowExecutionContext.EntityOperations.REPLICATE, wfProps);
verifyBrokerProperties(trgCluster, wfProps);
}
private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
FileSystem fs = trgMiniDFS.getFileSystem();
boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
if (isSecurityEnabled) {
Assert.assertNotNull(wf.getCredentials());
Assert.assertEquals(2, wf.getCredentials().getCredential().size());
}
List<Object> actions = wf.getDecisionOrForkOrJoin();
for (Object obj : actions) {
if (!(obj instanceof ACTION)) {
continue;
}
ACTION action = (ACTION) obj;
String actionName = action.getName();
if (!isSecurityEnabled) {
Assert.assertNull(action.getCred());
}
if ("recordsize".equals(actionName)) {
if (isSecurityEnabled) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
}
} else if ("table-export".equals(actionName) && isSecurityEnabled) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");
} else if ("table-import".equals(actionName) && isSecurityEnabled) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconTargetHiveAuth");
} else if ("replication".equals(actionName)) {
List<CONFIGURATION.Property> properties =
action.getJava().getConfiguration().getProperty();
for (CONFIGURATION.Property property : properties) {
if (property.getName().equals("mapreduce.job.hdfs-servers")) {
Assert.assertEquals(property.getValue(),
ClusterHelper.getReadOnlyStorageUrl(srcCluster)
+ "," + ClusterHelper.getStorageUrl(trgCluster));
}
}
}
}
}
private void assertTableStorageProperties(Cluster cluster, CatalogStorage tableStorage,
Map<String, String> props, String prefix) {
Assert.assertEquals(props.get(prefix + "NameNode"), ClusterHelper.getStorageUrl(cluster));
Assert.assertEquals(props.get(prefix + "JobTracker"), ClusterHelper.getMREndPoint(cluster));
Assert.assertEquals(props.get(prefix + "HcatNode"), tableStorage.getCatalogUrl());
Assert.assertEquals(props.get(prefix + "Database"), tableStorage.getDatabase());
Assert.assertEquals(props.get(prefix + "Table"), tableStorage.getTable());
Assert.assertEquals(props.get(prefix + "Partition"), "${coord:dataInPartitions('input', 'hive-export')}");
}
@DataProvider(name = "uMaskOptions")
private Object[][] createUMaskOptions() {
return new Object[][] {
{"000"}, // {FsAction.ALL, FsAction.ALL, FsAction.ALL},
{"077"}, // {FsAction.ALL, FsAction.NONE, FsAction.NONE}
{"027"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.NONE}
{"017"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.NONE}
{"012"}, // {FsAction.ALL, FsAction.READ_WRITE, FsAction.READ_EXECUTE}
{"022"}, // {FsAction.ALL, FsAction.READ_EXECUTE, FsAction.READ_EXECUTE}
};
}
@Test(dataProvider = "uMaskOptions")
public void testRetentionCoords(String umask) throws Exception {
FileSystem fs = srcMiniDFS.getFileSystem();
Configuration conf = fs.getConf();
conf.set("fs.permissions.umask-mode", umask);
// ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
setUmaskInFsConf(srcCluster, umask);
org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(feed, srcCluster.getName());
Calendar startCal = Calendar.getInstance();
Calendar endCal = Calendar.getInstance();
endCal.add(Calendar.DATE, 1);
cluster.getValidity().setEnd(endCal.getTime());
RuntimeProperties.get().setProperty("falcon.retention.keep.instances.beyond.validity", "false");
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(feed, Tag.RETENTION);
List<Properties> coords = builder.buildCoords(srcCluster, new Path("/projects/falcon/" + umask));
COORDINATORAPP coord = getCoordinator(srcMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(),
"${nameNode}/projects/falcon/" + umask + "/RETENTION");
Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + feed.getName());
Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
Assert.assertEquals(coord.getStart(), DateUtil.getDateFormatFromTime(startCal.getTimeInMillis()));
Date endDate = DateUtils.addSeconds(endCal.getTime(),
FeedHelper.getRetentionLimitInSeconds(feed, srcCluster.getName()));
Assert.assertEquals(coord.getEnd(), DateUtil.getDateFormatFromTime(endDate.getTime()));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
String feedDataPath = wfProps.get("feedDataPath");
String storageType = wfProps.get("falconFeedStorageType");
// verify the param that feed evictor depends on
Assert.assertEquals(storageType, Storage.TYPE.FILESYSTEM.name());
final Storage storage = FeedHelper.createStorage(cluster, feed);
if (feedDataPath != null) {
Assert.assertEquals(feedDataPath, storage.getUriTemplate()
.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
}
if (storageType != null) {
Assert.assertEquals(storageType, storage.getType().name());
}
// verify the post processing params
Assert.assertEquals(wfProps.get("feedNames"), feed.getName());
Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
assertWorkflowRetries(getWorkflowapp(srcMiniDFS.getFileSystem(), coord));
try {
verifyClusterLocationsUMask(srcCluster, fs);
verifyWorkflowUMask(fs, coord, umask);
} finally {
cleanupWorkflowState(fs, coord);
FileSystem.closeAll();
}
}
@Test (dataProvider = "secureOptions")
public void testRetentionCoordsForTable(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
final String umask = "000";
FileSystem fs = trgMiniDFS.getFileSystem();
Configuration conf = fs.getConf();
conf.set("fs.permissions.umask-mode", umask);
// ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
setUmaskInFsConf(trgCluster, umask);
org.apache.falcon.entity.v0.feed.Cluster cluster = FeedHelper.getCluster(tableFeed, trgCluster.getName());
final Calendar instance = Calendar.getInstance();
instance.add(Calendar.YEAR, 1);
cluster.getValidity().setEnd(instance.getTime());
OozieCoordinatorBuilder builder = OozieCoordinatorBuilder.get(tableFeed, Tag.RETENTION);
List<Properties> coords = builder.buildCoords(trgCluster, new Path("/projects/falcon/"));
COORDINATORAPP coord = getCoordinator(trgMiniDFS, coords.get(0).getProperty(OozieEntityBuilder.ENTITY_PATH));
Assert.assertEquals(coord.getAction().getWorkflow().getAppPath(), "${nameNode}/projects/falcon/RETENTION");
Assert.assertEquals(coord.getName(), "FALCON_FEED_RETENTION_" + tableFeed.getName());
Assert.assertEquals(coord.getFrequency(), "${coord:hours(6)}");
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
String feedDataPath = wfProps.get("feedDataPath");
String storageType = wfProps.get("falconFeedStorageType");
// verify the param that feed evictor depends on
Assert.assertEquals(storageType, Storage.TYPE.TABLE.name());
final Storage storage = FeedHelper.createStorage(cluster, tableFeed);
if (feedDataPath != null) {
Assert.assertEquals(feedDataPath, storage.getUriTemplate()
.replaceAll(Storage.DOLLAR_EXPR_START_REGEX, Storage.QUESTION_EXPR_START_REGEX));
}
if (storageType != null) {
Assert.assertEquals(storageType, storage.getType().name());
}
// verify the post processing params
Assert.assertEquals(wfProps.get("feedNames"), tableFeed.getName());
Assert.assertEquals(wfProps.get("feedInstancePaths"), "IGNORE");
assertWorkflowRetries(coord);
verifyBrokerProperties(srcCluster, wfProps);
verifyEntityProperties(tableFeed, trgCluster,
WorkflowExecutionContext.EntityOperations.DELETE, wfProps);
Assert.assertTrue(Storage.TYPE.TABLE == FeedHelper.getStorageType(tableFeed, trgCluster));
assertHCatCredentials(getWorkflowapp(trgMiniDFS.getFileSystem(), coord),
coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", ""));
try {
verifyClusterLocationsUMask(trgCluster, fs);
verifyWorkflowUMask(fs, coord, umask);
} finally {
cleanupWorkflowState(fs, coord);
FileSystem.closeAll();
}
}
private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
Path hiveConfPath = new Path(wfPath, "conf/hive-site.xml");
FileSystem fs = trgMiniDFS.getFileSystem();
Assert.assertTrue(fs.exists(hiveConfPath));
if (SecurityUtil.isSecurityEnabled()) {
Assert.assertNotNull(wf.getCredentials());
Assert.assertEquals(1, wf.getCredentials().getCredential().size());
}
List<Object> actions = wf.getDecisionOrForkOrJoin();
for (Object obj : actions) {
if (!(obj instanceof ACTION)) {
continue;
}
ACTION action = (ACTION) obj;
String actionName = action.getName();
if ("eviction".equals(actionName)) {
Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
if (SecurityUtil.isSecurityEnabled()) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconHiveAuth");
}
}
}
}
private void verifyClusterLocationsUMask(Cluster aCluster, FileSystem fs) throws IOException {
String stagingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.STAGING).getPath();
Path stagingPath = new Path(stagingLocation);
if (fs.exists(stagingPath)) {
FileStatus fileStatus = fs.getFileStatus(stagingPath);
Assert.assertEquals(fileStatus.getPermission().toShort(), 511);
}
String workingLocation = ClusterHelper.getLocation(aCluster, ClusterLocationType.WORKING).getPath();
Path workingPath = new Path(workingLocation);
if (fs.exists(workingPath)) {
FileStatus fileStatus = fs.getFileStatus(workingPath);
Assert.assertEquals(fileStatus.getPermission().toShort(), 493);
}
}
private void verifyWorkflowUMask(FileSystem fs, COORDINATORAPP coord,
String defaultUMask) throws IOException {
Assert.assertEquals(fs.getConf().get("fs.permissions.umask-mode"), defaultUMask);
String appPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
Path wfPath = new Path(appPath);
FileStatus[] fileStatuses = fs.listStatus(wfPath);
for (FileStatus fileStatus : fileStatuses) {
Assert.assertEquals(fileStatus.getOwner(), CurrentUser.getProxyUGI().getShortUserName());
final FsPermission permission = fileStatus.getPermission();
if (!fileStatus.isDirectory()) {
Assert.assertEquals(permission.toString(),
HadoopClientFactory.getFileDefaultPermission(fs.getConf()).toString());
}
}
}
private void cleanupWorkflowState(FileSystem fs, COORDINATORAPP coord) throws Exception {
String appPath = coord.getAction().getWorkflow().getAppPath();
Path wfPath = new Path(appPath.replace("${nameNode}", ""));
fs.delete(wfPath, true);
}
private static void setUmaskInFsConf(Cluster cluster, String umask) {
// ClusterHelper constructs new fs Conf. Add it to cluster properties so that it gets added to FS conf
org.apache.falcon.entity.v0.cluster.Property property =
new org.apache.falcon.entity.v0.cluster.Property();
property.setName("fs.permissions.umask-mode");
property.setValue(umask);
cluster.getProperties().getProperties().add(property);
}
@Test
public void testUserDefinedProperties() throws Exception {
Map<String, String> suppliedProps = new HashMap<>();
suppliedProps.put("custom.property", "custom value");
suppliedProps.put("ENTITY_NAME", "MyEntity");
OozieEntityBuilder builder = OozieEntityBuilder.get(lifecycleRetentionFeed);
Path bundlePath = new Path("/projects/falcon/");
Properties props = builder.build(trgCluster, bundlePath, suppliedProps);
Assert.assertNotNull(props);
Assert.assertEquals(props.get("ENTITY_NAME"), lifecycleRetentionFeed.getName());
Assert.assertEquals(props.get("custom.property"), "custom value");
}
}