blob: c5425b206863a2c8ace670e44870510a5694a1fc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.resource;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.client.FalconCLIException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.state.AbstractSchedulerTestBase;
import org.apache.falcon.service.FalconJPAService;
import org.apache.falcon.unit.FalconUnitTestBase;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.util.StateStoreProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FsShell;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import java.io.File;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
/**
* Base class for tests using Native Scheduler.
*/
public class AbstractSchedulerManagerJerseyIT extends FalconUnitTestBase {
private static final String SLEEP_WORKFLOW = "sleepWorkflow.xml";
private static final String LOCAL_MODE = "local";
private static final String IT_RUN_MODE = "it.run.mode";
public static final String PROCESS_TEMPLATE = "/local-process-noinputs-template.xml";
public static final String PROCESS_TEMPLATE_NOLATE_DATA = "/process-nolatedata-template.xml";
public static final String PROCESS_NAME = "processName";
protected static final String START_INSTANCE = "2012-04-20T00:00Z";
private static FalconJPAService falconJPAService = FalconJPAService.get();
private static final String DB_BASE_DIR = "target/test-data/falcondb";
protected static String dbLocation = DB_BASE_DIR + File.separator + "data.db";
protected static String url = "jdbc:derby:"+ dbLocation +";create=true";
protected static final String DB_SQL_FILE = dbLocation + File.separator + "out.sql";
protected LocalFileSystem localFS = new LocalFileSystem();
@BeforeClass
public void setup() throws Exception {
Configuration localConf = new Configuration();
localFS.initialize(LocalFileSystem.getDefaultUri(localConf), localConf);
cleanupDB();
localFS.mkdirs(new Path(DB_BASE_DIR));
falconJPAService.init();
createDB();
super.setup();
}
protected void updateStartUpProps() {
StartupProperties.get().setProperty("workflow.engine.impl",
"org.apache.falcon.workflow.engine.FalconWorkflowEngine");
StartupProperties.get().setProperty("dag.engine.impl",
"org.apache.falcon.workflow.engine.OozieDAGEngine");
String[] listeners = StartupProperties.get().getProperty("configstore.listeners").split(",");
List<String> configListeners = new ArrayList<>(Arrays.asList(listeners));
configListeners.remove("org.apache.falcon.service.SharedLibraryHostingService");
configListeners.add("org.apache.falcon.state.store.jdbc.JDBCStateStore");
StartupProperties.get().setProperty("configstore.listeners", StringUtils.join(configListeners, ","));
StateStoreProperties.get().getProperty("falcon.state.store.impl",
"org.apache.falcon.state.store.jdbc.JDBCStateStore");
}
protected void submitProcess(Map<String, String> overlay) throws IOException, FalconCLIException {
String tmpFile = TestContext.overlayParametersOverTemplate(PROCESS_TEMPLATE, overlay);
APIResult result = submit(EntityType.PROCESS, tmpFile);
assertStatus(result);
}
protected void scheduleProcess(String processName, String cluster,
String startTime, int noOfInstances) throws FalconCLIException {
APIResult result = falconUnitClient.schedule(EntityType.PROCESS, processName, startTime, noOfInstances,
cluster, true, null);
assertStatus(result);
}
protected void setupProcessExecution(UnitTestContext context,
Map<String, String> overlay, int numInstances,
String processTemplate) throws Exception {
String colo = overlay.get(COLO);
String cluster = overlay.get(CLUSTER);
submitCluster(colo, cluster, null);
submitFeeds(overlay);
context.prepare();
submitProcess(processTemplate, overlay);
String processName = overlay.get(PROCESS_NAME);
scheduleProcess(processName, cluster, START_INSTANCE, numInstances);
}
private void createDB() throws Exception {
AbstractSchedulerTestBase abstractSchedulerTestBase = new AbstractSchedulerTestBase();
StateStoreProperties.get().setProperty(FalconJPAService.URL, url);
abstractSchedulerTestBase.createDB(DB_SQL_FILE);
}
@AfterClass
public void cleanup() throws Exception {
super.cleanup();
cleanupDB();
}
private void cleanupDB() throws IOException {
localFS.delete(new Path(DB_BASE_DIR), true);
}
protected void submitCluster(UnitTestContext context) throws IOException, FalconCLIException {
String mode = System.getProperty(IT_RUN_MODE);
if (StringUtils.isNotEmpty(mode) && mode.toLowerCase().equals(LOCAL_MODE)) {
submitCluster(context.colo, context.clusterName, null);
} else {
fs.mkdirs(new Path(STAGING_PATH), HadoopClientFactory.ALL_PERMISSION);
fs.mkdirs(new Path(WORKING_PATH), HadoopClientFactory.READ_EXECUTE_PERMISSION);
String tmpFile = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE,
context.overlay);
submit(EntityType.CLUSTER, tmpFile);
}
}
protected APIResult submitFeed(String template, Map<String, String> overlay) throws IOException,
FalconCLIException {
String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay);
APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
return result;
}
protected void submitFeeds(Map<String, String> overlay) throws IOException, FalconCLIException {
String tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE1, overlay);
APIResult result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
tmpFile = TestContext.overlayParametersOverTemplate(UnitTestContext.FEED_TEMPLATE2, overlay);
result = falconUnitClient.submit(EntityType.FEED.name(), tmpFile, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
protected void submitProcess(String template, Map<String, String> overlay) throws Exception {
String tmpFile = TestContext.overlayParametersOverTemplate(template, overlay);
APIResult result = falconUnitClient.submit(EntityType.PROCESS.name(), tmpFile, null);
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
protected void scheduleProcess(UnitTestContext context) throws FalconCLIException, IOException, FalconException {
String scheduleTime = START_INSTANCE;
APIResult result = scheduleProcess(context.getProcessName(), scheduleTime, 1, context.getClusterName(),
getAbsolutePath(SLEEP_WORKFLOW), true, "");
Assert.assertEquals(result.getStatus(), APIResult.Status.SUCCEEDED);
}
protected void schedule(UnitTestContext context) throws Exception {
submitCluster(context);
context.prepare();
submitFeeds(context.overlay);
submitProcess(UnitTestContext.PROCESS_TEMPLATE, context.overlay);
scheduleProcess(context);
}
protected List<Path> createTestData() throws Exception {
List<Path> list = new ArrayList<Path>();
fs.mkdirs(new Path("/user/guest"));
fs.setOwner(new Path("/user/guest"), TestContext.REMOTE_USER, "users");
DateFormat formatter = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
formatter.setTimeZone(TimeZone.getTimeZone("UTC"));
Date date = new Date(System.currentTimeMillis() + 3 * 3600000);
Path path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
date = new Date(date.getTime() - 3600000);
path = new Path("/examples/input-data/rawLogs/" + formatter.format(date) + "/file");
list.add(path);
fs.create(path).close();
new FsShell(new Configuration()).run(new String[] {
"-chown", "-R", "guest:users", "/examples/input-data/rawLogs", });
return list;
}
public void assertEntityStatus(APIResult apiResult) {
super.assertStatus(apiResult);
String message = apiResult.getMessage();
if (!message.contains(AbstractEntityManager.EntityStatus.SUBMITTED.name())) {
Assert.assertTrue(message.contains("native"));
}
}
}