blob: a692d0cc02824e87bf5ca52b10413146ba06bbb8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.oozie.process;
import org.apache.falcon.FalconException;
import org.apache.falcon.Tag;
import org.apache.falcon.cluster.util.EmbeddedCluster;
import org.apache.falcon.entity.CatalogStorage;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.Storage;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.Frequency;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.cluster.ClusterLocationType;
import org.apache.falcon.entity.v0.cluster.Interfacetype;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Output;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.entity.v0.process.Workflow;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.oozie.OozieEntityBuilder;
import org.apache.falcon.oozie.bundle.BUNDLEAPP;
import org.apache.falcon.oozie.bundle.CONFIGURATION;
import org.apache.falcon.oozie.coordinator.COORDINATORAPP;
import org.apache.falcon.oozie.coordinator.SYNCDATASET;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.PIG;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.SecurityUtil;
import org.apache.falcon.util.FalconTestUtil;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.WorkflowExecutionArgs;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import javax.xml.bind.JAXBElement;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
/**
* Test for the Falcon entities mapping into Oozie artifacts.
*/
public class OozieProcessWorkflowBuilderTest extends AbstractTestBase {
private static final String PROCESS_XML = "/config/process/process-0.1.xml";
private static final String FEED_XML = "/config/feed/feed-0.1.xml";
private static final String CLUSTER_XML = "/config/cluster/cluster-0.1.xml";
private static final String PIG_PROCESS_XML = "/config/process/pig-process-0.1.xml";
private static final String SPARK_PROCESS_XML = "/config/process/spark-process-0.1.xml";
private String hdfsUrl;
private FileSystem fs;
private Cluster cluster;
@BeforeClass
public void setUpDFS() throws Exception {
CurrentUser.authenticate(FalconTestUtil.TEST_USER_1);
Configuration conf = EmbeddedCluster.newCluster("testCluster").getConf();
hdfsUrl = conf.get(HadoopClientFactory.FS_DEFAULT_NAME_KEY);
}
private void storeEntity(EntityType type, String name, String resource) throws Exception {
storeEntity(type, name, resource, null);
}
@BeforeMethod
public void setUp() throws Exception {
storeEntity(EntityType.CLUSTER, "corp", CLUSTER_XML);
storeEntity(EntityType.FEED, "clicks", FEED_XML);
storeEntity(EntityType.FEED, "impressions", FEED_XML);
storeEntity(EntityType.FEED, "clicksummary", FEED_XML);
storeEntity(EntityType.PROCESS, "clicksummary", PROCESS_XML);
storeEntity(EntityType.PROCESS, "pig-process", PIG_PROCESS_XML);
ConfigurationStore store = ConfigurationStore.get();
cluster = store.get(EntityType.CLUSTER, "corp");
org.apache.falcon.entity.v0.cluster.Property property =
new org.apache.falcon.entity.v0.cluster.Property();
property.setName(SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL);
property.setValue("hive/_HOST");
cluster.getProperties().getProperties().add(property);
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
ClusterHelper.getInterface(cluster, Interfacetype.REGISTRY).setEndpoint("thrift://localhost:49083");
fs = new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration());
fs.create(new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(),
"libext/PROCESS/ext.jar")).close();
Process process = store.get(EntityType.PROCESS, "clicksummary");
Path wfpath = new Path(process.getWorkflow().getPath());
assert new Path(hdfsUrl).getFileSystem(EmbeddedCluster.newConfiguration()).mkdirs(wfpath);
}
public void testDefCoordMap(Process process, COORDINATORAPP coord) throws Exception {
assertEquals("FALCON_PROCESS_DEFAULT_" + process.getName(), coord.getName());
Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
assertEquals(SchemaHelper.formatDateUTC(processValidity.getStart()), coord.getStart());
assertEquals(SchemaHelper.formatDateUTC(processValidity.getEnd()), coord.getEnd());
assertEquals("${coord:" + process.getFrequency().toString() + "}", coord.getFrequency());
assertEquals(process.getTimezone().getID(), coord.getTimezone());
assertEquals(process.getParallel() + "", coord.getControls().getConcurrency());
assertEquals(process.getOrder().name(), coord.getControls().getExecution());
assertEquals(process.getInputs().getInputs().get(0).getName(),
coord.getInputEvents().getDataIn().get(0).getName());
assertEquals(process.getInputs().getInputs().get(0).getName(),
coord.getInputEvents().getDataIn().get(0).getDataset());
assertEquals("${" + process.getInputs().getInputs().get(0).getStart() + "}",
coord.getInputEvents().getDataIn().get(0).getStartInstance());
assertEquals("${" + process.getInputs().getInputs().get(0).getEnd() + "}",
coord.getInputEvents().getDataIn().get(0).getEndInstance());
assertEquals(process.getInputs().getInputs().get(1).getName(),
coord.getInputEvents().getDataIn().get(1).getName());
assertEquals(process.getInputs().getInputs().get(1).getName(),
coord.getInputEvents().getDataIn().get(1).getDataset());
assertEquals("${" + process.getInputs().getInputs().get(1).getStart() + "}",
coord.getInputEvents().getDataIn().get(1).getStartInstance());
assertEquals("${" + process.getInputs().getInputs().get(1).getEnd() + "}",
coord.getInputEvents().getDataIn().get(1).getEndInstance());
assertEquals(process.getOutputs().getOutputs().get(0).getName() + "stats",
coord.getOutputEvents().getDataOut().get(1).getName());
assertEquals(process.getOutputs().getOutputs().get(0).getName() + "meta",
coord.getOutputEvents().getDataOut().get(2).getName());
assertEquals(process.getOutputs().getOutputs().get(0).getName(),
coord.getOutputEvents().getDataOut().get(0).getName());
assertEquals("${" + process.getOutputs().getOutputs().get(0).getInstance() + "}",
coord.getOutputEvents().getDataOut().get(0).getInstance());
assertEquals(process.getOutputs().getOutputs().get(0).getName(),
coord.getOutputEvents().getDataOut().get(0).getDataset());
assertEquals(5, coord.getDatasets().getDatasetOrAsyncDataset().size());
ConfigurationStore store = ConfigurationStore.get();
Feed feed = store.get(EntityType.FEED, process.getInputs().getInputs().get(0).getFeed());
SYNCDATASET ds = (SYNCDATASET) coord.getDatasets().getDatasetOrAsyncDataset().get(0);
final org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0);
assertEquals(SchemaHelper.formatDateUTC(feedCluster.getValidity().getStart()), ds.getInitialInstance());
assertEquals(feed.getTimezone().getID(), ds.getTimezone());
assertEquals("${coord:" + feed.getFrequency().toString() + "}", ds.getFrequency());
assertEquals("", ds.getDoneFlag());
assertEquals(ds.getUriTemplate(),
FeedHelper.createStorage(feedCluster, feed).getUriTemplate(LocationType.DATA));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
assertEquals(wfProps.get("mapred.job.priority"), "LOW");
List<Input> inputs = process.getInputs().getInputs();
assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()), inputs.get(0).getName() + "#" + inputs
.get(1).getName());
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
assertLibExtensions(fs, coord, EntityType.PROCESS, null);
}
@Test
public void testBundle() throws Exception {
String path = StartupProperties.get().getProperty("system.lib.location");
if (!new File(path).exists()) {
Assert.assertTrue(new File(path).mkdirs());
}
Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
testParentWorkflow(process, parentWorkflow);
ACTION oozieAction = getAction(parentWorkflow, "user-action");
Assert.assertNotNull(oozieAction.getSubWorkflow());
}
@Test
public void testBundle1() throws Exception {
Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
process.setFrequency(Frequency.fromString("minutes(1)"));
process.setTimeout(Frequency.fromString("minutes(15)"));
WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "30", "15");
testParentWorkflow(process, parentWorkflow);
}
@Test
public void testPigProcessMapper() throws Exception {
Process process = ConfigurationStore.get().get(EntityType.PROCESS, "pig-process");
Assert.assertEquals("pig", process.getWorkflow().getEngine().value());
prepare(process);
WORKFLOWAPP parentWorkflow = initializeProcessMapper(process, "12", "360");
testParentWorkflow(process, parentWorkflow);
ACTION pigActionNode = getAction(parentWorkflow, "user-action");
final PIG pigAction = pigActionNode.getPig();
Assert.assertEquals(pigAction.getScript(), "${nameNode}/apps/pig/id.pig");
Assert.assertNotNull(pigAction.getPrepare());
Assert.assertEquals(1, pigAction.getPrepare().getDelete().size());
Assert.assertFalse(pigAction.getParam().isEmpty());
Assert.assertEquals(5, pigAction.getParam().size());
Assert.assertEquals(Collections.EMPTY_LIST, pigAction.getArchive());
Assert.assertTrue(pigAction.getFile().size() > 0);
}
@DataProvider(name = "secureOptions")
private Object[][] createOptions() {
return new Object[][] {
{"simple"},
{"kerberos"},
};
}
@Test (dataProvider = "secureOptions")
public void testHiveProcessMapper(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);
resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);
resource = this.getClass().getResource("/config/process/hive-process.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
expected.putAll(ClusterHelper.getHiveProperties(cluster));
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
ACTION hiveNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
Assert.assertNull(hiveAction.getPrepare());
Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
Assert.assertFalse(hiveAction.getParam().isEmpty());
Assert.assertEquals(14, hiveAction.getParam().size());
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
assertHCatCredentials(parentWorkflow, wfPath);
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
@Test
public void testSparkSQLProcess() throws Exception {
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);
resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);
resource = this.getClass().getResource("/config/process/spark-sql-process.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
// verify table and hive props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
expected.putAll(ClusterHelper.getHiveProperties(cluster));
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/falcon-examples.jar");
ACTION sparkNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
OozieUtils.unMarshalSparkAction(sparkNode);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
assertEquals(sparkAction.getMaster(), "local");
assertEquals(sparkAction.getJar(), "falcon-examples.jar");
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
List<String> argsList = sparkAction.getArg();
Input input = process.getInputs().getInputs().get(0);
Output output = process.getOutputs().getOutputs().get(0);
assertEquals(argsList.get(0), "${falcon_"+input.getName()+"_partition_filter_hive}");
assertEquals(argsList.get(1), "${falcon_"+input.getName()+"_table}");
assertEquals(argsList.get(2), "${falcon_"+input.getName()+"_database}");
assertEquals(argsList.get(3), "${falcon_"+output.getName()+"_partitions_hive}");
assertEquals(argsList.get(4), "${falcon_"+output.getName()+"_table}");
assertEquals(argsList.get(5), "${falcon_"+output.getName()+"_database}");
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
@Test
public void testSparkProcess() throws Exception {
URL resource = this.getClass().getResource(SPARK_PROCESS_XML);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
Assert.assertEquals("spark", process.getWorkflow().getEngine().value());
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
assertEquals(process.getWorkflow().getLib(), "/resources/action/lib/spark-wordcount.jar");
ACTION sparkNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.spark.ACTION> actionJaxbElement =
OozieUtils.unMarshalSparkAction(sparkNode);
org.apache.falcon.oozie.spark.ACTION sparkAction = actionJaxbElement.getValue();
assertEquals(sparkAction.getMaster(), "local");
assertEquals(sparkAction.getJar(), "spark-wordcount.jar");
List<String> argsList = sparkAction.getArg();
Input input = process.getInputs().getInputs().get(0);
Output output = process.getOutputs().getOutputs().get(0);
assertEquals(argsList.get(0), "${"+input.getName().toString()+"}");
assertEquals(argsList.get(argsList.size()-1), "${"+output.getName().toString()+"}");
}
@Test (dataProvider = "secureOptions")
public void testHiveProcessMapperWithFSInputFeedAndTableOutputFeed(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
URL resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);
resource = this.getClass().getResource("/config/process/hive-process-FSInputFeed.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
ACTION hiveNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
Assert.assertNull(hiveAction.getPrepare());
Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
Assert.assertFalse(hiveAction.getParam().isEmpty());
Assert.assertEquals(10, hiveAction.getParam().size());
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
assertHCatCredentials(parentWorkflow, wfPath);
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
@Test (dataProvider = "secureOptions")
public void testHiveProcessMapperWithTableInputFeedAndFSOutputFeed(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);
resource = this.getClass().getResource("/config/process/hive-process-FSOutputFeed.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
ACTION hiveNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(hiveNode);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
Assert.assertNotNull(hiveAction.getPrepare());
Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
Assert.assertFalse(hiveAction.getParam().isEmpty());
Assert.assertEquals(6, hiveAction.getParam().size());
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
assertHCatCredentials(parentWorkflow, wfPath);
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
@Test (dataProvider = "secureOptions")
public void testHiveProcessWithNoInputsAndOutputs(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
URL resource = this.getClass().getResource("/config/process/dumb-hive-process.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
prepare(process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
testParentWorkflow(process, parentWorkflow);
ACTION hiveNode = getAction(parentWorkflow, "user-action");
JAXBElement<org.apache.falcon.oozie.hive.ACTION> actionJaxbElement = OozieUtils.unMarshalHiveAction(
hiveNode);
org.apache.falcon.oozie.hive.ACTION hiveAction = actionJaxbElement.getValue();
Assert.assertEquals(hiveAction.getScript(), "${nameNode}/apps/hive/script.hql");
Assert.assertEquals(hiveAction.getJobXml(), "${wf:appPath()}/conf/hive-site.xml");
Assert.assertNull(hiveAction.getPrepare());
Assert.assertEquals(Collections.EMPTY_LIST, hiveAction.getArchive());
Assert.assertTrue(hiveAction.getParam().isEmpty());
ConfigurationStore.get().remove(EntityType.PROCESS, process.getName());
}
private void assertHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
Path hiveConfPath = new Path(new Path(wfPath), "conf/hive-site.xml");
Assert.assertTrue(fs.exists(hiveConfPath));
if (SecurityUtil.isSecurityEnabled()) {
Assert.assertNotNull(wf.getCredentials());
Assert.assertEquals(1, wf.getCredentials().getCredential().size());
}
List<Object> actions = wf.getDecisionOrForkOrJoin();
for (Object obj : actions) {
if (!(obj instanceof ACTION)) {
continue;
}
ACTION action = (ACTION) obj;
if (!SecurityUtil.isSecurityEnabled()) {
Assert.assertNull(action.getCred());
return;
}
String actionName = action.getName();
if ("user-hive-job".equals(actionName) || "user-pig-job".equals(actionName)
|| "user-oozie-workflow".equals(actionName) || "recordsize".equals(actionName)) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconHiveAuth");
}
}
}
private void prepare(Process process) throws IOException {
Path wf = new Path(process.getWorkflow().getPath());
fs.mkdirs(wf.getParent());
fs.create(wf).close();
}
@Test (dataProvider = "secureOptions")
public void testProcessMapperForTableStorage(String secureOption) throws Exception {
StartupProperties.get().setProperty(SecurityUtil.AUTHENTICATION_TYPE, secureOption);
URL resource = this.getClass().getResource("/config/feed/hive-table-feed.xml");
Feed inFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, inFeed);
resource = this.getClass().getResource("/config/feed/hive-table-feed-out.xml");
Feed outFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.FEED, outFeed);
resource = this.getClass().getResource("/config/process/pig-process-table.xml");
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, process);
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
// verify table props
Map<String, String> expected = getExpectedProperties(inFeed, outFeed, process);
for (Map.Entry<String, String> entry : props.entrySet()) {
if (expected.containsKey(entry.getKey())) {
Assert.assertEquals(entry.getValue(), expected.get(entry.getKey()));
}
}
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(process, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
// verify the late data params
Assert.assertEquals(props.get("falconInputFeeds"), process.getInputs().getInputs().get(0).getFeed());
Assert.assertEquals(props.get("falconInPaths"), "${coord:dataIn('input')}");
Assert.assertEquals(props.get("falconInputFeedStorageTypes"), Storage.TYPE.TABLE.name());
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_NAMES.getName()),
process.getInputs().getInputs().get(0).getName());
// verify the post processing params
Assert.assertEquals(props.get("feedNames"), process.getOutputs().getOutputs().get(0).getFeed());
Assert.assertEquals(props.get("feedInstancePaths"), "${coord:dataOut('output')}");
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
WORKFLOWAPP parentWorkflow = getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
Assert.assertTrue(Storage.TYPE.TABLE == ProcessHelper.getStorageType(cluster, process));
assertHCatCredentials(parentWorkflow, wfPath);
}
private Map<String, String> getExpectedProperties(Feed inFeed, Feed outFeed,
Process process) throws FalconException {
Map<String, String> expected = new HashMap<String, String>();
if (process.getInputs() != null) {
for (Input input : process.getInputs().getInputs()) {
CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, inFeed);
propagateStorageProperties(input.getName(), storage, expected);
}
}
if (process.getOutputs() != null) {
for (Output output : process.getOutputs().getOutputs()) {
CatalogStorage storage = (CatalogStorage) FeedHelper.createStorage(cluster, outFeed);
propagateStorageProperties(output.getName(), storage, expected);
}
}
return expected;
}
private void propagateStorageProperties(String feedName, CatalogStorage tableStorage,
Map<String, String> props) {
String prefix = "falcon_" + feedName;
props.put(prefix + "_storage_type", tableStorage.getType().name());
props.put(prefix + "_catalog_url", tableStorage.getCatalogUrl());
props.put(prefix + "_database", tableStorage.getDatabase());
props.put(prefix + "_table", tableStorage.getTable());
if (prefix.equals("falcon_input")) {
props.put(prefix + "_partition_filter_pig", "${coord:dataInPartitionFilter('input', 'pig')}");
props.put(prefix + "_partition_filter_hive", "${coord:dataInPartitionFilter('input', 'hive')}");
props.put(prefix + "_partition_filter_java", "${coord:dataInPartitionFilter('input', 'java')}");
} else if (prefix.equals("falcon_output")) {
props.put(prefix + "_dataout_partitions", "${coord:dataOutPartitions('output')}");
}
}
@Test
public void testProcessWorkflowMapper() throws Exception {
Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
Workflow processWorkflow = process.getWorkflow();
Assert.assertEquals("test", processWorkflow.getName());
Assert.assertEquals("1.0.0", processWorkflow.getVersion());
}
private WORKFLOWAPP initializeProcessMapper(Process process, String throttle, String timeout)
throws Exception {
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Path bundlePath = new Path("/falcon/staging/workflows", process.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(process).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
List<CONFIGURATION.Property> props = bundle.getCoordinator().get(0).getConfiguration().getProperty();
for (CONFIGURATION.Property prop : props) {
if (prop.getName().equals("oozie.libpath")) {
Assert.assertEquals(prop.getValue().replace("${nameNode}", ""), process.getWorkflow().getLib());
}
}
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
testDefCoordMap(process, coord);
assertEquals(coord.getControls().getThrottle(), throttle);
assertEquals(coord.getControls().getTimeout(), timeout);
String wfPath = coord.getAction().getWorkflow().getAppPath().replace("${nameNode}", "");
return getWorkflowapp(fs, new Path(wfPath, "workflow.xml"));
}
public void testParentWorkflow(Process process, WORKFLOWAPP parentWorkflow) {
Assert.assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, process).toString(), parentWorkflow.getName());
if (process.getLateProcess() != null) {
assertAction(parentWorkflow, "pre-processing", true);
}
assertAction(parentWorkflow, "succeeded-post-processing", true);
assertAction(parentWorkflow, "failed-post-processing", true);
assertAction(parentWorkflow, "user-action", false);
}
@AfterMethod
public void cleanup() throws Exception {
cleanupStore();
}
@Test
public void testProcessWithNoInputsAndOutputs() throws Exception {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
URL resource = this.getClass().getResource("/config/process/dumb-process.xml");
Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
String[] expected = {
WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(),
WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(),
WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(),
WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(),
WorkflowExecutionArgs.INPUT_NAMES.getName(),
};
for (String property : expected) {
Assert.assertTrue(props.containsKey(property), "expected property missing: " + property);
}
}
@Test
public void testProcessWithInputsNoOutputs() throws Exception {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
URL resource = this.getClass().getResource("/config/process/process-no-outputs.xml");
Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "clicks");
Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "NONE");
}
@Test
public void testProcessNoInputsWithOutputs() throws Exception {
ClusterHelper.getInterface(cluster, Interfacetype.WRITE).setEndpoint(hdfsUrl);
URL resource = this.getClass().getResource("/config/process/process-no-inputs.xml");
Process processEntity = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(resource);
ConfigurationStore.get().publish(EntityType.PROCESS, processEntity);
OozieEntityBuilder builder = OozieEntityBuilder.get(processEntity);
Path bundlePath = new Path("/falcon/staging/workflows", processEntity.getName());
builder.build(cluster, bundlePath);
assertTrue(fs.exists(bundlePath));
BUNDLEAPP bundle = getBundle(fs, bundlePath);
assertEquals(EntityUtil.getWorkflowName(processEntity).toString(), bundle.getName());
assertEquals(1, bundle.getCoordinator().size());
assertEquals(EntityUtil.getWorkflowName(Tag.DEFAULT, processEntity).toString(),
bundle.getCoordinator().get(0).getName());
String coordPath = bundle.getCoordinator().get(0).getAppPath().replace("${nameNode}", "");
COORDINATORAPP coord = getCoordinator(fs, new Path(coordPath));
HashMap<String, String> props = getCoordProperties(coord);
HashMap<String, String> wfProps = getWorkflowProperties(fs, coord);
verifyEntityProperties(processEntity, cluster,
WorkflowExecutionContext.EntityOperations.GENERATE, wfProps);
verifyBrokerProperties(cluster, wfProps);
Assert.assertEquals(props.get(WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName()), "impressions");
Assert.assertEquals(props.get(WorkflowExecutionArgs.INPUT_FEED_NAMES.getName()), "NONE");
}
@Test
public void testUserDefinedProperties() throws Exception {
Map<String, String> suppliedProps = new HashMap<>();
suppliedProps.put("custom.property", "custom value");
suppliedProps.put("ENTITY_NAME", "MyEntity");
Process process = ConfigurationStore.get().get(EntityType.PROCESS, "clicksummary");
Path bundlePath = new Path("/projects/falcon/");
OozieEntityBuilder builder = OozieEntityBuilder.get(process);
Properties props = builder.build(cluster, bundlePath, suppliedProps);
Assert.assertNotNull(props);
Assert.assertEquals(props.get("ENTITY_NAME"), process.getName());
Assert.assertEquals(props.get("custom.property"), "custom value");
}
}