blob: b6553e9b2bb3c4ce935ee958ff62518735487caf [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.resource;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.commons.io.FileUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.feed.Locations;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.entity.v0.process.Property;
import org.apache.falcon.entity.v0.process.Validity;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.unit.FalconUnit;
import org.apache.falcon.util.BuildProperties;
import org.apache.falcon.util.DeploymentProperties;
import org.apache.falcon.util.FalconTestUtil;
import org.apache.falcon.util.OozieTestUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.Job.Status;
import org.apache.oozie.client.OozieClient;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import javax.ws.rs.core.MediaType;
import java.io.File;
import java.io.IOException;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.regex.Pattern;
/**
* Test class for Entity REST APIs.
*
* Tests should be enabled only in local environments as they need running instance of the web server.
*/
@Test(groups = {"exhaustive"})
public class EntityManagerJerseyIT extends AbstractSchedulerManagerJerseyIT {
private static final String START_INSTANCE = "2012-04-20T00:00Z";
private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
@BeforeClass
@Override
public void setup() throws Exception {
String version = System.getProperty("project.version");
String buildDir = System.getProperty("project.build.directory");
System.setProperty("falcon.libext", buildDir + "/../../unit/target/falcon-unit-" + version + ".jar");
super.setup();
}
@AfterMethod
@Override
public void cleanUpActionXml() throws IOException, FalconException {
//Needed since oozie writes action xml to current directory.
FileUtils.deleteQuietly(new File("action.xml"));
FileUtils.deleteQuietly(new File(".action.xml.crc"));
contexts.remove();
}
private ThreadLocal<UnitTestContext> contexts = new ThreadLocal<UnitTestContext>();
private UnitTestContext newContext() throws FalconException, IOException {
contexts.set(new UnitTestContext());
return contexts.get();
}
static void assertLibs(FileSystem fs, Path path) throws IOException {
FileStatus[] libs = fs.listStatus(path);
Assert.assertNotNull(libs);
}
private Entity getDefinition(EntityType type, String name) throws Exception {
Entity entity = falconUnitClient.getDefinition(type.name(), name, null);
return entity;
}
private void updateEndtime(Process process) {
Validity processValidity = process.getClusters().getClusters().get(0).getValidity();
processValidity.setEnd(new Date(new Date().getTime() + 2 * 24 * 60 * 60 * 1000));
}
@Test
public void testLibExtensions() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
FileSystem fs = FalconUnit.getFileSystem();
assertLibs(fs, new Path("/projects/falcon/working/libext/FEED/retention"));
assertLibs(fs, new Path("/projects/falcon/working/libext/PROCESS"));
String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Feed feed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(new File(tmpFileName));
Location location = new Location();
location.setPath("fsext://global:00/falcon/test/input/${YEAR}/${MONTH}/${DAY}/${HOUR}");
location.setType(LocationType.DATA);
Cluster cluster = feed.getClusters().getClusters().get(0);
cluster.setLocations(new Locations());
feed.getClusters().getClusters().get(0).getLocations().getLocations().add(location);
File tmpFile = UnitTestContext.getTempFile();
EntityType.FEED.getMarshaller().marshal(feed, tmpFile);
APIResult result = falconUnitClient.submitAndSchedule(EntityType.FEED.name(), tmpFile.getAbsolutePath(), true,
null, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
@Test
public void testUpdateCheckUser() throws Exception {
UnitTestContext context = newContext();
String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE,
context.overlay);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
updateEndtime(process);
File tmpFile = UnitTestContext.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
submitCluster(context);
context.prepare();
submitFeeds(context.overlay);
submitProcess(tmpFile.getAbsolutePath(), context.overlay);
scheduleProcess(context.getProcessName(), context.getClusterName(), getAbsolutePath(SLEEP_WORKFLOW));
waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), START_INSTANCE,
InstancesResult.WorkflowStatus.RUNNING);
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
Feed feed = (Feed) getDefinition(EntityType.FEED, context.outputFeedName);
//change output feed path and update feed as another user
feed.getLocations().getLocations().get(0).setPath("/falcon/test/output2/${YEAR}/${MONTH}/${DAY}");
tmpFile = TestContext.getTempFile();
feed.getEntityType().getMarshaller().marshal(feed, tmpFile);
APIResult result = falconUnitClient.update(EntityType.FEED.name(), feed.getName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 2);
Assert.assertEquals(bundles.get(0).getUser(), TestContext.REMOTE_USER);
Assert.assertEquals(bundles.get(1).getUser(), TestContext.REMOTE_USER);
}
public void testOptionalInput() throws Exception {
UnitTestContext context = newContext();
String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
Input in1 = process.getInputs().getInputs().get(0);
Input in2 = new Input();
in2.setFeed(in1.getFeed());
in2.setName("input2");
in2.setOptional(true);
in2.setPartition(in1.getPartition());
in2.setStart("now(-1,0)");
in2.setEnd("now(0,0)");
process.getInputs().getInputs().add(in2);
File tmpFile = TestContext.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
schedule(context);
waitForStatus(EntityType.PROCESS.name(), context.getProcessName(), START_INSTANCE,
InstancesResult.WorkflowStatus.SUCCEEDED);
}
public void testDryRun() throws Exception {
//Schedule of invalid process should fail because of dryRun, and should pass when dryrun is skipped
UnitTestContext context = newContext();
String tmpFileName = TestContext.overlayParametersOverTemplate(UnitTestContext.PROCESS_TEMPLATE,
context.overlay);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
Property prop = new Property();
prop.setName("newProp");
prop.setValue("${instanceTim()}"); //invalid property
process.getProperties().getProperties().add(prop);
File tmpFile = TestContext.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
try {
falconUnitClient.validate(EntityType.PROCESS.name(), tmpFile.getAbsolutePath(),
true, null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
schedule(context);
//Fix the process and then submitAndSchedule should succeed
Iterator<Property> itr = process.getProperties().getProperties().iterator();
while (itr.hasNext()) {
Property myProp = itr.next();
if (myProp.getName().equals("newProp")) {
itr.remove();
}
}
tmpFile = TestContext.getTempFile();
process.setName("process" + System.currentTimeMillis());
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
APIResult result = falconUnitClient.submitAndSchedule(EntityType.PROCESS.name(),
tmpFile.getAbsolutePath(), true, null, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
// update where dryrun is disabled should succeed.
tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
result = falconUnitClient.update(EntityType.PROCESS.name(), process.getName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
@Test
public void testUpdateSuspendedEntity() throws Exception {
UnitTestContext context = newContext();
schedule(context);
waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
InstancesResult.WorkflowStatus.RUNNING);
//Suspend entity
Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
APIResult result = falconUnitClient.suspend(EntityType.PROCESS, process.getName(), context.colo,
null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
null, false);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Assert.assertEquals(result.getMessage(), "SUSPENDED");
process.getProperties().getProperties().get(0).setName("newprop");
Date endTime = getEndTime();
process.getClusters().getClusters().get(0).getValidity().setEnd(endTime);
File tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
//Since the process endtime = update effective time, it shouldn't create new bundle
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
//Since the entity was suspended before update, it should still be suspended
Assert.assertEquals(bundles.get(0).getStatus(), Status.SUSPENDED);
}
@Test
public void testProcessInputUpdate() throws Exception {
UnitTestContext context = newContext();
schedule(context);
waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
InstancesResult.WorkflowStatus.RUNNING);
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
OozieClient ozClient = OozieTestUtils.getOozieClient(context);
String bundle = bundles.get(0).getId();
String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId();
Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
String feed3 = "f3" + System.currentTimeMillis();
Map<String, String> overlay = new HashMap<String, String>();
overlay.put("inputFeedName", feed3);
overlay.put("cluster", context.clusterName);
overlay.put("user", System.getProperty("user.name"));
submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay);
Input input = new Input();
input.setFeed(feed3);
input.setName("inputData2");
input.setStart("today(20,0)");
input.setEnd("today(20,20)");
process.getInputs().getInputs().add(input);
updateEndtime(process);
Date endTime = getEndTime();
File tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
//Assert that update creates new bundle and old coord is running
bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 2);
CoordinatorJob coord = ozClient.getCoordJobInfo(coordId);
Assert.assertEquals(coord.getStatus(), Status.RUNNING);
Assert.assertEquals(coord.getEndTime(), endTime);
//Assert on new bundle/coord
String newBundle = null;
for (BundleJob myBundle : bundles) {
if (!myBundle.getId().equals(bundle)) {
newBundle = myBundle.getId();
break;
}
}
assert newBundle != null;
OozieTestUtils.waitForBundleStart(context, newBundle, Job.Status.RUNNING, Status.PREP);
coord = ozClient.getCoordJobInfo(ozClient.getBundleJobInfo(newBundle).getCoordinators().get(0).getId());
Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.PREP);
Assert.assertEquals(coord.getStartTime(), endTime);
}
@Test
public void testProcessEndtimeUpdate() throws Exception {
UnitTestContext context = newContext();
schedule(context);
OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
updateEndtime(process);
File tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
//Assert that update does not create new bundle
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
}
@Test
public void testTouchEntity() throws Exception {
UnitTestContext context = newContext();
schedule(context);
OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
OozieClient ozClient = OozieTestUtils.getOozieClient(context);
String bundle = bundles.get(0).getId();
String coordId = ozClient.getBundleJobInfo(bundle).getCoordinators().get(0).getId();
//Update end time of process required for touch
Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
updateEndtime(process);
File tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
APIResult result = falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
tmpFile.getAbsolutePath(), true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
result = falconUnitClient.touch(EntityType.PROCESS.name(), context.getProcessName(), context.colo, true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
OozieTestUtils.waitForBundleStart(context, Status.PREP, Status.RUNNING);
//Assert that touch creates new bundle and old coord is running
bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 2);
CoordinatorJob coord = ozClient.getCoordJobInfo(coordId);
Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.SUCCEEDED);
//Assert on new bundle/coord
String newBundle = null;
for (BundleJob myBundle : bundles) {
if (!myBundle.getId().equals(bundle)) {
newBundle = myBundle.getId();
break;
}
}
assert newBundle != null;
coord = ozClient.getCoordJobInfo(ozClient.getBundleJobInfo(newBundle).getCoordinators().get(0).getId());
Assert.assertTrue(coord.getStatus() == Status.RUNNING || coord.getStatus() == Status.PREP);
}
public void testStatus() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
APIResult result = falconUnitClient.getStatus(EntityType.FEED, context.overlay.get("inputFeedName"),
context.colo, null, false);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Assert.assertEquals(result.getMessage(), "SUBMITTED");
}
public void testIdempotentSubmit() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
submitCluster(context);
}
public void testNotFoundStatus() throws FalconException, IOException, FalconCLIException {
String feed1 = "f1" + System.currentTimeMillis();
try {
falconUnitClient.getStatus(EntityType.FEED, feed1, null, null, false);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
public void testVersion() throws FalconException, IOException, FalconCLIException {
String json = falconUnitClient.getVersion(null);
String buildVersion = BuildProperties.get().getProperty("build.version");
String deployMode = DeploymentProperties.get().getProperty("deploy.mode");
Assert.assertTrue(Pattern.matches(
".*\\{\\s*\"key\"\\s*:\\s*\"Version\"\\s*,\\s*\"value\"\\s*:\\s*\""
+ buildVersion + "\"\\s*}.*", json),
"No build.version found in /api/admin/version");
Assert.assertTrue(Pattern.matches(
".*\\{\\s*\"key\"\\s*:\\s*\"Mode\"\\s*,\\s*\"value\"\\s*:\\s*\""
+ deployMode + "\"\\s*}.*", json),
"No deploy.mode found in /api/admin/version");
}
public void testValidate() throws FalconException, IOException {
UnitTestContext context = newContext();
try {
falconUnitClient.validate(EntityType.PROCESS.name(),
UnitTestContext.class.getResource(UnitTestContext.SAMPLE_PROCESS_XML).getPath(), true, null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
public void testClusterValidate() throws Exception {
UnitTestContext context = newContext();
String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, context.overlay);
File tmpFile = new File(tmpFileName);
fs.mkdirs(new Path(STAGING_PATH), HadoopClientFactory.ALL_PERMISSION);
fs.mkdirs(new Path(WORKING_PATH), HadoopClientFactory.READ_EXECUTE_PERMISSION);
APIResult result = falconUnitClient.validate(EntityType.CLUSTER.name(), tmpFile.getAbsolutePath(),
true, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
ClientResponse suspend(TestContext context, Entity entity) {
return suspend(context, entity.getEntityType(), entity.getName());
}
private ClientResponse suspend(TestContext context, EntityType entityType, String name) {
return context.service
.path("api/entities/suspend/" + entityType.name().toLowerCase() + "/" + name)
.header("Cookie", context.getAuthenticationToken())
.accept(MediaType.TEXT_XML).type(MediaType.TEXT_XML)
.post(ClientResponse.class);
}
public void testClusterSubmitScheduleSuspendResumeDelete() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
try {
falconUnitClient.schedule(EntityType.CLUSTER, context.clusterName, null, true, null, null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
try {
falconUnitClient.suspend(EntityType.CLUSTER, context.clusterName, context.colo, null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
try {
falconUnitClient.resume(EntityType.CLUSTER, context.clusterName, context.colo, null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
APIResult result = falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
public void testSubmit() throws Exception {
UnitTestContext context = newContext();
ClientResponse response;
submitCluster(context);
submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay);
submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
}
@Test
public void testDuplicateSubmitCommands() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
ExecutorService service = Executors.newSingleThreadExecutor();
ExecutorService duplicateService = Executors.newSingleThreadExecutor();
Future<APIResult> future = service.submit(new SubmitCommand(context, context.overlay));
Future<APIResult> duplicateFuture = duplicateService.submit(new SubmitCommand(context, context.overlay));
// since there are duplicate threads for submits, there is no guarantee which request will succeed.
try {
APIResult response = future.get();
APIResult duplicateSubmitThreadResponse = duplicateFuture.get();
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
@Test
public void testDuplicateDeleteCommands() throws Exception {
UnitTestContext context = newContext();
Map<String, String> overlay = context.overlay;
submitCluster(context);
submitFeed(UnitTestContext.FEED_TEMPLATE1, overlay);
ExecutorService service = Executors.newFixedThreadPool(2);
Future<APIResult> future = service.submit(new DeleteCommand(context, overlay.get("inputFeedName"),
"feed"));
Future<APIResult> duplicateFuture = service.submit(new DeleteCommand(context,
overlay.get("inputFeedName"), "feed"));
// since there are duplicate threads for submits, there is no guarantee which request will succeed.
try {
APIResult response = future.get();
APIResult duplicateSubmitThreadResponse = duplicateFuture.get();
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
public void testProcesssScheduleAndDelete() throws Exception {
scheduleAndDeleteProcess(false);
}
public void testProcesssScheduleAndDeleteWithDoAs() throws Exception {
scheduleAndDeleteProcess(true);
}
private void scheduleAndDeleteProcess(boolean withDoAs) throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
submitFeeds(context.overlay);
ClientResponse clientResponse;
String tmpFileName = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, context.overlay);
Process process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(new File(tmpFileName));
updateEndtime(process);
File tmpFile = TestContext.getTempFile();
EntityType.PROCESS.getMarshaller().marshal(process, tmpFile);
submitProcess(tmpFile.getAbsolutePath(), context.overlay);
if (withDoAs) {
falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false,
FalconTestUtil.TEST_USER_2, null);
} else {
falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), context.getClusterName(), false, "",
"key1:value1");
}
OozieTestUtils.waitForBundleStart(context, Status.RUNNING);
APIResult result;
if (withDoAs) {
result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), FalconTestUtil.TEST_USER_2);
} else {
result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
}
assertStatus(result);
}
public void testGetEntityDefinition() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Feed feed = (Feed) falconUnitClient.getDefinition(EntityType.FEED.name(),
context.overlay.get("inputFeedName"), null);
Assert.assertEquals(feed.getName(), context.overlay.get("inputFeedName"));
}
public void testInvalidGetEntityDefinition() throws FalconException, IOException, FalconCLIException {
try {
falconUnitClient.getDefinition(EntityType.PROCESS.name(), "sample1", null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
public void testScheduleSuspendResume() throws Exception {
UnitTestContext context = newContext();
schedule(context);
waitForStatus(EntityType.PROCESS.name(), context.processName, START_INSTANCE,
InstancesResult.WorkflowStatus.RUNNING);
APIResult result = falconUnitClient.suspend(EntityType.PROCESS, context.getProcessName(),
context.colo, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
null, false);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Assert.assertEquals(result.getMessage(), "SUSPENDED");
result = falconUnitClient.resume(EntityType.PROCESS, context.getProcessName(),
context.colo, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
result = falconUnitClient.getStatus(EntityType.PROCESS, context.processName, context.clusterName,
null, false);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Assert.assertEquals(result.getMessage(), "RUNNING");
}
public void testFeedSchedule() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
createTestData();
result = falconUnitClient.schedule(EntityType.FEED, context.overlay.get("inputFeedName"), null, true, null,
null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
public void testDeleteDataSet() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
result = falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
public void testDelete() throws Exception {
UnitTestContext context = newContext();
submitCluster(context);
context.prepare();
APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
try {
falconUnitClient.delete(EntityType.CLUSTER, context.getClusterName(), null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
result = submitFeed(UnitTestContext.FEED_TEMPLATE2, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
//Delete a referred feed
try {
falconUnitClient.delete(EntityType.FEED, context.overlay.get("inputFeedName"), null);
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
//Delete a submitted process
result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
result = falconUnitClient.schedule(EntityType.PROCESS, context.getProcessName(), null, true, null,
null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
//Delete a scheduled process
result = falconUnitClient.delete(EntityType.PROCESS, context.getProcessName(), null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
@Test
public void testGetEntityList() throws Exception {
EntityList result = falconUnitClient.getEntityList(EntityType.PROCESS.name(), "", "", null, null,
null, null, null, new Integer(0), new Integer(1), null);
Assert.assertNotNull(result);
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNull(entityElement.status); // status is null
}
result = falconUnitClient.getEntityList(EntityType.CLUSTER.name(), "", "", null, null,
null, null, null, new Integer(0), new Integer(1), null);
Assert.assertNotNull(result);
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNull(entityElement.status); // status is null
}
result = falconUnitClient.getEntityList(EntityType.FEED.name() + "," + EntityType.PROCESS.name(),
"", "", null, null, null, null, null, new Integer(0), new Integer(1), null);
Assert.assertNotNull(result);
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNull(entityElement.status); // status is null
}
result = falconUnitClient.getEntityList(null, "", "", null, null, null, null, null, new Integer(0),
new Integer(1), null);
Assert.assertNotNull(result);
for (EntityList.EntityElement entityElement : result.getElements()) {
Assert.assertNull(entityElement.status); // status is null
}
}
@Test
public void testDuplicateUpdateCommands() throws Exception {
UnitTestContext context = newContext();
schedule(context);
OozieTestUtils.waitForBundleStart(context, Job.Status.RUNNING);
List<BundleJob> bundles = OozieTestUtils.getBundles(context);
Assert.assertEquals(bundles.size(), 1);
Process process = (Process) getDefinition(EntityType.PROCESS, context.processName);
String feed3 = "f3" + System.currentTimeMillis();
Map<String, String> overlay = new HashMap<String, String>();
overlay.put("inputFeedName", feed3);
overlay.put("cluster", context.clusterName);
overlay.put("user", System.getProperty("user.name"));
APIResult result = submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
Input input = new Input();
input.setFeed(feed3);
input.setName("inputData2");
input.setStart("today(20,0)");
input.setEnd("today(20,20)");
process.getInputs().getInputs().add(input);
updateEndtime(process);
Date endTime = getEndTime();
ExecutorService service = Executors.newSingleThreadExecutor();
ExecutorService duplicateService = Executors.newSingleThreadExecutor();
Future<APIResult> future = service.submit(new UpdateCommand(context, process, endTime));
Future<APIResult> duplicateFuture = duplicateService.submit(new UpdateCommand(context, process, endTime));
// since there are duplicate threads for updates, there is no guarantee which request will succeed
try {
future.get();
duplicateFuture.get();
Assert.fail("Exception should be Thrown");
} catch (Exception e) {
//ignore
}
}
public Date getEndTime() {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.add(Calendar.DAY_OF_MONTH, 1);
cal.set(Calendar.HOUR_OF_DAY, 0);
cal.set(Calendar.MINUTE, 0);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTime();
}
class UpdateCommand implements Callable<APIResult> {
private UnitTestContext context;
private Process process;
private Date endTime;
public UnitTestContext getContext() {
return context;
}
public Process getProcess() {
return process;
}
public UpdateCommand(UnitTestContext context, Process process, Date endTime) {
this.context = context;
this.process = process;
this.endTime = endTime;
}
@Override
public APIResult call() throws Exception {
File tmpFile = TestContext.getTempFile();
process.getEntityType().getMarshaller().marshal(process, tmpFile);
return falconUnitClient.update(EntityType.PROCESS.name(), context.getProcessName(),
tmpFile.getAbsolutePath(), true, null);
}
}
class SubmitCommand implements Callable<APIResult> {
private Map<String, String> overlay;
private UnitTestContext context;
public UnitTestContext getContext() {
return context;
}
public Map<String, String> getOverlay() {
return overlay;
}
public SubmitCommand(UnitTestContext context, Map<String, String> overlay) {
this.context = context;
this.overlay = overlay;
}
@Override
public APIResult call() throws Exception {
return submitFeed(UnitTestContext.FEED_TEMPLATE1, context.overlay);
}
}
class DeleteCommand implements Callable<APIResult> {
private UnitTestContext context;
private String entityName;
private String entityType;
public UnitTestContext getContext() {
return context;
}
public DeleteCommand(UnitTestContext context, String entityName, String entityType) {
this.context = context;
this.entityName = entityName;
this.entityType = entityType;
}
@Override
public APIResult call() throws Exception {
return falconUnitClient.delete(EntityType.valueOf(entityType), entityName, null);
}
}
}