blob: a45633bde709f53ec718561afb2dd4c5fdeb12de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.workflow;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.EngineType;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
* A test for WorkflowExecutionContext.
*/
public class WorkflowExecutionContextTest {
private static final String FALCON_USER = "falcon-user";
private static final String LOGS_DIR = "target/log";
private static final String NOMINAL_TIME = "2014-01-01-01-00";
private static final String OPERATION = "GENERATE";
private static final String CLUSTER_NAME = "primary-cluster";
private static final String ENTITY_NAME = "sample-process";
private static final String WORKFLOW_NAME = "imp-click-join-workflow";
private static final String WORKFLOW_VERSION = "1.0.9";
private static final String INPUT_FEED_NAMES = "impression-feed#clicks-feed";
private static final String INPUT_INSTANCE_PATHS =
"jail://global:00/falcon/impression-feed/2014/01/01,jail://global:00/falcon/impression-feed/2014/01/02"
+ "#jail://global:00/falcon/clicks-feed/2014-01-01";
private static final String OUTPUT_FEED_NAMES = "imp-click-join1,imp-click-join2";
private static final String OUTPUT_INSTANCE_PATHS =
"jail://global:00/falcon/imp-click-join1/20140101,jail://global:00/falcon/imp-click-join2/20140101";
private static final String BROKER = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String ISO8601_TIME = SchemaHelper.formatDateUTCToISO8601(
NOMINAL_TIME, WorkflowExecutionContext.INSTANCE_FORMAT);
private WorkflowExecutionContext context;
@BeforeMethod
public void setUp() throws Exception {
context = WorkflowExecutionContext.create(getTestMessageArgs(),
WorkflowExecutionContext.Type.POST_PROCESSING);
}
@Test
public void testGetValue() throws Exception {
Assert.assertEquals(context.getValue(WorkflowExecutionArgs.ENTITY_NAME), ENTITY_NAME);
}
@Test
public void testGetValueWithDefault() throws Exception {
Assert.assertEquals(context.getValue(WorkflowExecutionArgs.TOPIC_NAME, "ABSENT"), "ABSENT");
}
@Test
public void testContainsKey() throws Exception {
Assert.assertTrue(context.containsKey(WorkflowExecutionArgs.ENTITY_NAME));
Assert.assertFalse(context.containsKey(WorkflowExecutionArgs.TOPIC_NAME));
}
@Test
public void testEntrySet() throws Exception {
Assert.assertNotNull(context.entrySet());
}
@Test
public void testHasWorkflowSucceeded() throws Exception {
Assert.assertTrue(context.hasWorkflowSucceeded());
}
@Test
public void testHasWorkflowFailed() throws Exception {
Assert.assertFalse(context.hasWorkflowFailed());
}
@Test
public void testGetContextFile() throws Exception {
Assert.assertEquals(context.getContextFile(),
WorkflowExecutionContext.getFilePath(context.getLogDir(), context.getEntityName()));
}
@Test
public void testGetLogDir() throws Exception {
Assert.assertEquals(context.getLogDir(), LOGS_DIR);
}
@Test
public void testGetLogFile() throws Exception {
Assert.assertEquals(context.getLogFile(), LOGS_DIR + "/log.txt");
}
@Test
public void testGetNominalTime() throws Exception {
Assert.assertEquals(context.getNominalTime(), NOMINAL_TIME);
}
@Test
public void testGetNominalTimeAsISO8601() throws Exception {
Assert.assertEquals(context.getNominalTimeAsISO8601(), ISO8601_TIME);
}
@Test
public void testGetTimestamp() throws Exception {
Assert.assertEquals(context.getTimestamp(), NOMINAL_TIME);
}
@Test
public void testGetTimeStampAsISO8601() throws Exception {
Assert.assertEquals(context.getTimeStampAsISO8601(), ISO8601_TIME);
}
@Test
public void testGetClusterName() throws Exception {
Assert.assertEquals(context.getClusterName(), CLUSTER_NAME);
}
@Test
public void testGetEntityName() throws Exception {
Assert.assertEquals(context.getEntityName(), ENTITY_NAME);
}
@Test
public void testGetEntityType() throws Exception {
Assert.assertEquals(context.getEntityType(), "process");
}
@Test
public void testGetOperation() throws Exception {
Assert.assertEquals(context.getOperation().name(), OPERATION);
}
@Test
public void testGetOutputFeedNames() throws Exception {
Assert.assertEquals(context.getOutputFeedNames(), OUTPUT_FEED_NAMES);
}
@Test
public void testGetOutputFeedNamesList() throws Exception {
Assert.assertEquals(context.getOutputFeedNamesList(),
OUTPUT_FEED_NAMES.split(WorkflowExecutionContext.OUTPUT_FEED_SEPARATOR));
}
@Test
public void testGetOutputFeedInstancePaths() throws Exception {
Assert.assertEquals(context.getOutputFeedInstancePaths(), OUTPUT_INSTANCE_PATHS);
}
@Test
public void testGetOutputFeedInstancePathsList() throws Exception {
Assert.assertEquals(context.getOutputFeedInstancePathsList(),
OUTPUT_INSTANCE_PATHS.split(","));
}
@Test
public void testGetInputFeedNames() throws Exception {
Assert.assertEquals(context.getOutputFeedNames(), OUTPUT_FEED_NAMES);
}
@Test
public void testGetInputFeedNamesList() throws Exception {
Assert.assertEquals(context.getInputFeedNamesList(),
INPUT_FEED_NAMES.split(WorkflowExecutionContext.INPUT_FEED_SEPARATOR));
}
@Test
public void testGetInputFeedInstancePaths() throws Exception {
Assert.assertEquals(context.getInputFeedInstancePaths(), INPUT_INSTANCE_PATHS);
}
@Test
public void testGetInputFeedInstancePathsList() throws Exception {
Assert.assertEquals(context.getInputFeedInstancePathsList(),
INPUT_INSTANCE_PATHS.split("#"));
}
@Test
public void testGetWorkflowEngineUrl() throws Exception {
Assert.assertEquals(context.getWorkflowEngineUrl(), "http://localhost:11000/oozie");
}
@Test
public void testGetUserWorkflowEngine() throws Exception {
Assert.assertEquals(context.getUserWorkflowEngine(), EngineType.PIG.name());
}
@Test
public void testGetUserWorkflowVersion() throws Exception {
Assert.assertEquals(context.getUserWorkflowVersion(), WORKFLOW_VERSION);
}
@Test
public void testGetWorkflowId() throws Exception {
Assert.assertEquals(context.getWorkflowId(), "workflow-01-00");
}
@Test
public void testGetUserSubflowId() throws Exception {
Assert.assertEquals(context.getUserSubflowId(), "userflow@wf-id");
}
@Test
public void testGetWorkflowRunId() throws Exception {
Assert.assertEquals(context.getWorkflowRunId(), 1);
}
@Test
public void testGetWorkflowRunIdString() throws Exception {
Assert.assertEquals(context.getWorkflowRunIdString(), "1");
}
@Test
public void testGetWorkflowUser() throws Exception {
Assert.assertEquals(context.getWorkflowUser(), FALCON_USER);
}
@Test
public void testGetExecutionCompletionTime() throws Exception {
Assert.assertNotNull(context.getExecutionCompletionTime());
}
@Test
public void testSerializeDeserialize() throws Exception {
String contextFile = context.getContextFile();
context.serialize();
WorkflowExecutionContext newContext = WorkflowExecutionContext.deSerialize(contextFile);
Assert.assertNotNull(newContext);
Assert.assertEquals(newContext.entrySet().size(), context.entrySet().size());
}
@Test
public void testSerializeDeserializeWithFile() throws Exception {
String contextFile = "/tmp/blah.json";
context.serialize(contextFile);
WorkflowExecutionContext newContext = WorkflowExecutionContext.deSerialize(contextFile);
Assert.assertNotNull(newContext);
Assert.assertEquals(newContext.entrySet().size(), context.entrySet().size());
}
@Test
public void testGetFilePath() throws Exception {
Assert.assertEquals(WorkflowExecutionContext.getFilePath(LOGS_DIR, ENTITY_NAME),
LOGS_DIR + "/" + ENTITY_NAME + "-wf-post-exec-context.json");
}
private static String[] getTestMessageArgs() {
return new String[]{
"-" + WorkflowExecutionArgs.CLUSTER_NAME.getName(), CLUSTER_NAME,
"-" + WorkflowExecutionArgs.ENTITY_TYPE.getName(), "process",
"-" + WorkflowExecutionArgs.ENTITY_NAME.getName(), ENTITY_NAME,
"-" + WorkflowExecutionArgs.NOMINAL_TIME.getName(), NOMINAL_TIME,
"-" + WorkflowExecutionArgs.OPERATION.getName(), OPERATION,
"-" + WorkflowExecutionArgs.INPUT_FEED_NAMES.getName(), INPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.INPUT_FEED_PATHS.getName(), INPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.OUTPUT_FEED_NAMES.getName(), OUTPUT_FEED_NAMES,
"-" + WorkflowExecutionArgs.OUTPUT_FEED_PATHS.getName(), OUTPUT_INSTANCE_PATHS,
"-" + WorkflowExecutionArgs.WORKFLOW_ID.getName(), "workflow-01-00",
"-" + WorkflowExecutionArgs.WORKFLOW_USER.getName(), FALCON_USER,
"-" + WorkflowExecutionArgs.RUN_ID.getName(), "1",
"-" + WorkflowExecutionArgs.STATUS.getName(), "SUCCEEDED",
"-" + WorkflowExecutionArgs.TIMESTAMP.getName(), NOMINAL_TIME,
"-" + WorkflowExecutionArgs.WF_ENGINE_URL.getName(), "http://localhost:11000/oozie",
"-" + WorkflowExecutionArgs.USER_SUBFLOW_ID.getName(), "userflow@wf-id",
"-" + WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), WORKFLOW_NAME,
"-" + WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), WORKFLOW_VERSION,
"-" + WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), EngineType.PIG.name(),
"-" + WorkflowExecutionArgs.BRKR_IMPL_CLASS.getName(), BROKER,
"-" + WorkflowExecutionArgs.BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
"-" + WorkflowExecutionArgs.USER_BRKR_IMPL_CLASS.getName(), BROKER,
"-" + WorkflowExecutionArgs.USER_BRKR_URL.getName(), "tcp://localhost:61616?daemon=true",
"-" + WorkflowExecutionArgs.BRKR_TTL.getName(), "1000",
"-" + WorkflowExecutionArgs.LOG_DIR.getName(), LOGS_DIR,
"-" + WorkflowExecutionArgs.LOG_FILE.getName(), LOGS_DIR + "/log.txt",
};
}
}