blob: 2aed6f562ed9b4ea6cab5e787673567f52f6f308 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.cluster;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.Schema;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import lombok.Getter;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.FsDatasetStateStore;
import org.apache.gobblin.runtime.JobContext;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.listeners.AbstractJobListener;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
/**
* Unit tests for {@link GobblinHelixJobLauncher}.
*
* <p>
* This class uses a {@link TestingServer} as an embedded ZooKeeper server for testing. This class
* also uses the {@link HelixManager} to act as a testing Helix controller to be passed into the
* {@link GobblinHelixJobLauncher} instance. A {@link GobblinTaskRunner} is also used to run
* the single task of the test job. A {@link FsDatasetStateStore} is used to check the state store
* after the job is done. The test job writes everything to the local file system as returned by
* {@link FileSystem#getLocal(Configuration)}.
* </p>
*
* @author Yinan Li
*/
@Test(groups = { "gobblin.cluster" })
public class GobblinHelixJobLauncherTest {
public final static Logger LOG = LoggerFactory.getLogger(GobblinHelixJobLauncherTest.class);
private HelixManager helixManager;
private FileSystem localFs;
private Path appWorkDir;
private GobblinTaskRunner gobblinTaskRunner;
private DatasetStateStore datasetStateStore;
private Thread thread;
private final Closer closer = Closer.create();
private Config baseConfig;
@BeforeClass
public void setUp() throws Exception {
TestingServer testingZKServer = this.closer.register(new TestingServer(-1));
LOG.info("Testing ZK Server listening on: " + testingZKServer.getConnectString());
URL url = GobblinHelixJobLauncherTest.class.getClassLoader().getResource(
GobblinHelixJobLauncherTest.class.getSimpleName() + ".conf");
Assert.assertNotNull(url, "Could not find resource " + url);
this.appWorkDir = new Path(GobblinHelixJobLauncherTest.class.getSimpleName());
// Prepare the source Json file
File sourceJsonFile = new File(this.appWorkDir.toString(), TestHelper.TEST_JOB_NAME + ".json");
TestHelper.createSourceJsonFile(sourceJsonFile);
baseConfig = ConfigFactory.parseURL(url)
.withValue("gobblin.cluster.zk.connection.string",
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
.withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
.withValue(ConfigurationKeys.JOB_STATE_IN_STATE_STORE, ConfigValueFactory.fromAnyRef("true"))
.resolve();
String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
this.helixManager = HelixManagerFactory
.getZKHelixManager(helixClusterName, TestHelper.TEST_HELIX_INSTANCE_NAME, InstanceType.CONTROLLER,
zkConnectingString);
this.closer.register(new Closeable() {
@Override
public void close() throws IOException {
helixManager.disconnect();
}
});
this.helixManager.connect();
this.localFs = FileSystem.getLocal(new Configuration());
this.closer.register(new Closeable() {
@Override
public void close() throws IOException {
if (localFs.exists(appWorkDir)) {
localFs.delete(appWorkDir, true);
}
}
});
this.gobblinTaskRunner =
new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME,
TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, baseConfig, Optional.of(appWorkDir));
String stateStoreType = ConfigUtils.getString(baseConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY,
ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
ClassAliasResolver<DatasetStateStore.Factory> resolver =
new ClassAliasResolver<>(DatasetStateStore.Factory.class);
DatasetStateStore.Factory stateStoreFactory =
resolver.resolveClass(stateStoreType).newInstance();
this.datasetStateStore = stateStoreFactory.createStateStore(baseConfig);
this.thread = new Thread(new Runnable() {
@Override
public void run() {
gobblinTaskRunner.start();
}
});
this.thread.start();
}
private Properties generateJobProperties(Config baseConfig, String jobNameSuffix, String jobIdSuffix) {
Properties properties = ConfigUtils.configToProperties(baseConfig);
String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + jobNameSuffix;
properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName);
properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + jobName + jobIdSuffix);
properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName);
// expiry time should be more than the time needed for the job to complete
// otherwise JobContext will become null. This is how Helix work flow works.
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "5");
return properties;
}
private File getJobOutputFile(Properties properties) {
return new File(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR),
properties.getProperty(ConfigurationKeys.WRITER_FILE_PATH) + File.separator + properties
.getProperty(ConfigurationKeys.WRITER_FILE_NAME));
}
public void testLaunchJob() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>();
// Normal job launcher
final Properties properties = generateJobProperties(this.baseConfig, "1", "_1504201348470");
final GobblinHelixJobLauncher gobblinHelixJobLauncher = this.closer.register(
new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
java.util.Optional.empty()));
gobblinHelixJobLauncher.launchJob(null);
final File jobOutputFile = getJobOutputFile(properties);
Assert.assertTrue(jobOutputFile.exists());
Schema schema = new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA);
TestHelper.assertGenericRecords(jobOutputFile, schema);
List<JobState.DatasetState> datasetStates = this.datasetStateStore.getAll(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
FsDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + FsDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX);
Assert.assertEquals(datasetStates.size(), 1);
JobState.DatasetState datasetState = datasetStates.get(0);
Assert.assertEquals(datasetState.getCompletedTasks(), 1);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getTaskStates().size(), 1);
Assert.assertEquals(datasetState.getTaskStates().get(0).getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
}
private static class SuspendJobListener extends AbstractJobListener {
@Getter
private AtomicInteger completes = new AtomicInteger();
private CountDownLatch stg1;
private CountDownLatch stg2;
public SuspendJobListener (CountDownLatch stg1, CountDownLatch stg2) {
this.stg1 = stg1;
this.stg2 = stg2;
}
@Override
public void onJobStart (JobContext jobContext) throws Exception {
stg1.countDown();
stg2.await();
}
@Override
public void onJobCompletion(JobContext jobContext) throws Exception {
completes.addAndGet(1);
}
}
public void testLaunchMultipleJobs() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>();
// Job launcher(1) to test parallel job running
final Properties properties1 = generateJobProperties(this.baseConfig, "2", "_1504201348471");
final GobblinHelixJobLauncher gobblinHelixJobLauncher1 = this.closer.register(
new GobblinHelixJobLauncher(properties1, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
java.util.Optional.empty()));
// Job launcher(2) to test parallel job running
final Properties properties2 = generateJobProperties(this.baseConfig, "2", "_1504201348472");
final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = this.closer.register(
new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
java.util.Optional.empty()));
CountDownLatch stg1 = new CountDownLatch(1);
CountDownLatch stg2 = new CountDownLatch(1);
CountDownLatch stg3 = new CountDownLatch(1);
SuspendJobListener testListener = new SuspendJobListener(stg1, stg2);
(new Thread(() -> {
try {
gobblinHelixJobLauncher1.launchJob(testListener);
stg3.countDown();
} catch (JobException e) {
}
})).start();
// Wait for the first job to start
stg1.await();
// When first job is in the middle of running, launch the second job (which should do NOOP because previous job is still running)
gobblinHelixJobLauncher2.launchJob(testListener);
stg2.countDown();
// Wait for the first job to finish
stg3.await();
Assert.assertEquals(testListener.getCompletes().get() == 1, true);
}
@Test(enabled = false, dependsOnMethods = {"testLaunchJob", "testLaunchMultipleJobs"})
public void testJobCleanup() throws Exception {
final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>();
final Properties properties = generateJobProperties(this.baseConfig, "3", "_1504201348473");
final GobblinHelixJobLauncher gobblinHelixJobLauncher =
new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
java.util.Optional.empty());
final Properties properties2 = generateJobProperties(this.baseConfig, "33", "_1504201348474");
final GobblinHelixJobLauncher gobblinHelixJobLauncher2 =
new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap,
java.util.Optional.empty());
gobblinHelixJobLauncher.launchJob(null);
gobblinHelixJobLauncher2.launchJob(null);
final TaskDriver taskDriver = new TaskDriver(this.helixManager);
final String jobIdKey1 = properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
final String jobIdKey2 = properties2.getProperty(ConfigurationKeys.JOB_ID_KEY);
org.apache.helix.task.JobContext jobContext1 = taskDriver.getJobContext(jobIdKey1);
org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(jobIdKey2);
waitForWorkFlowStartup(taskDriver, jobIdKey1);
waitForWorkFlowStartup(taskDriver, jobIdKey2);
// job context should be present until close
Assert.assertNotNull(jobContext1);
Assert.assertNotNull(jobContext2);
gobblinHelixJobLauncher.close();
// workflow deleted asynchronously after close
waitForWorkFlowCleanup(taskDriver, jobIdKey1);
jobContext1 = taskDriver.getJobContext(jobIdKey1);
// job context should have been deleted
Assert.assertNull(jobContext1);
// workflow should have been deleted
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(jobIdKey1);
Assert.assertNull(workflowConfig);
WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobIdKey1);
Assert.assertNull(workflowContext);
// second workflow with shared prefix should not be deleted when the first workflow is cleaned up
workflowConfig = taskDriver.getWorkflowConfig(jobIdKey2);
Assert.assertNotNull(workflowConfig);
gobblinHelixJobLauncher2.close();
// workflow deleted asynchronously after close
waitForWorkFlowCleanup(taskDriver, jobIdKey2);
workflowConfig = taskDriver.getWorkflowConfig(jobIdKey2);
Assert.assertNull(workflowConfig);
// check that workunit and taskstate directory for the job are cleaned up
final File workunitsDir =
new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME
+ File.separator + jobIdKey1);
final File taskstatesDir =
new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
+ File.separator + jobIdKey1);
Assert.assertFalse(workunitsDir.exists());
Assert.assertFalse(taskstatesDir.exists());
// check that job.state file is cleaned up
final File jobStateFile = new File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, jobIdKey1).toString());
Assert.assertFalse(jobStateFile.exists());
}
@AfterClass
public void tearDown() throws IOException {
try {
this.gobblinTaskRunner.stop();
this.thread.join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
} finally {
this.closer.close();
}
}
private void waitForWorkFlowCleanup(TaskDriver taskDriver, String queueName) {
for (int i = 0; i < 60; i++) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(queueName);
if (workflowConfig == null) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
private void waitForWorkFlowStartup(TaskDriver taskDriver, String workflow) {
for (int i = 0; i < 5; i++) {
WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow);
if (workflowConfig != null) {
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}