blob: 14c5a91663dd6864d39edbfcf460b37f66a2bfe5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.runtime;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import com.google.common.base.Predicates;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metastore.MysqlStateStore;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.util.ClassAliasResolver;
/**
* Unit tests for {@link MysqlDatasetStateStore}.
**/
@Test(groups = { "gobblin.runtime" })
public class MysqlDatasetStateStoreTest {
private static final String TEST_STATE_STORE = "TestStateStore";
private static final String TEST_JOB_NAME = "TestJob";
private static final String TEST_JOB_NAME_LOWER = "testjob";
private static final String TEST_JOB_NAME2 = "TestJob2";
private static final String TEST_JOB_ID = "TestJob1";
private static final String TEST_TASK_ID_PREFIX = "TestTask-";
private static final String TEST_DATASET_URN = "TestDataset";
private static final String TEST_DATASET_URN_LOWER = "testdataset";
private static final String TEST_DATASET_URN2 = "TestDataset2";
private StateStore<JobState> dbJobStateStore;
private DatasetStateStore<JobState.DatasetState> dbDatasetStateStore;
private long startTime = System.currentTimeMillis();
private ITestMetastoreDatabase testMetastoreDatabase;
private static final String TEST_USER = "testUser";
private static final String TEST_PASSWORD = "testPassword";
@BeforeClass
public void setUp() throws Exception {
testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
String jdbcUrl = testMetastoreDatabase.getJdbcUrl();
ConfigBuilder configBuilder = ConfigBuilder.create();
HikariDataSource dataSource = new HikariDataSource();
dataSource.setDriverClassName(ConfigurationKeys.DEFAULT_STATE_STORE_DB_JDBC_DRIVER);
dataSource.setAutoCommit(false);
dataSource.setJdbcUrl(jdbcUrl);
dataSource.setUsername(TEST_USER);
dataSource.setPassword(TEST_PASSWORD);
dbJobStateStore = new MysqlStateStore<>(dataSource, TEST_STATE_STORE, false, JobState.class);
configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, jdbcUrl);
configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_USER_KEY, TEST_USER);
configBuilder.addPrimitive(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, TEST_PASSWORD);
ClassAliasResolver<DatasetStateStore.Factory> resolver =
new ClassAliasResolver<>(DatasetStateStore.Factory.class);
DatasetStateStore.Factory stateStoreFactory =
resolver.resolveClass("mysql").newInstance();
dbDatasetStateStore = stateStoreFactory.createStateStore(configBuilder.build());
// clear data that may have been left behind by a prior test run
dbJobStateStore.delete(TEST_JOB_NAME);
dbDatasetStateStore.delete(TEST_JOB_NAME);
dbJobStateStore.delete(TEST_JOB_NAME2);
dbDatasetStateStore.delete(TEST_JOB_NAME2);
}
@Test
public void testPersistJobState() throws IOException {
JobState jobState = new JobState(TEST_JOB_NAME, TEST_JOB_ID);
jobState.setId(TEST_JOB_ID);
jobState.setProp("foo", "bar");
jobState.setState(JobState.RunningState.COMMITTED);
jobState.setStartTime(this.startTime);
jobState.setEndTime(this.startTime + 1000);
jobState.setDuration(1000);
for (int i = 0; i < 3; i++) {
TaskState taskState = new TaskState();
taskState.setJobId(TEST_JOB_ID);
taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
taskState.setId(TEST_TASK_ID_PREFIX + i);
taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
jobState.addTaskState(taskState);
}
dbJobStateStore.put(TEST_JOB_NAME,
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
jobState);
// for testing a colliding lowercase job name
jobState.setJobName(TEST_JOB_NAME_LOWER);
jobState.setProp("lower", "case");
dbJobStateStore.put(TEST_JOB_NAME_LOWER,
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
jobState);
// second job name for testing getting store names in a later test case
jobState.setJobName(TEST_JOB_NAME2);
dbJobStateStore.put(TEST_JOB_NAME2,
MysqlDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + MysqlDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
jobState);
}
@Test(dependsOnMethods = "testPersistJobState")
public void testGetJobState() throws IOException {
JobState jobState = dbJobStateStore.get(TEST_JOB_NAME,
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getProp("foo"), "bar");
Assert.assertNotEquals(jobState.getProp("lower"), "case");
Assert.assertEquals(jobState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(jobState.getStartTime(), this.startTime);
Assert.assertEquals(jobState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(jobState.getDuration(), 1000);
Assert.assertEquals(jobState.getCompletedTasks(), 3);
for (int i = 0; i < jobState.getCompletedTasks(); i++) {
TaskState taskState = jobState.getTaskStates().get(i);
Assert.assertEquals(taskState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(taskState.getTaskId(), TEST_TASK_ID_PREFIX + i);
Assert.assertEquals(taskState.getId(), TEST_TASK_ID_PREFIX + i);
Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
}
jobState = dbJobStateStore.get(TEST_JOB_NAME_LOWER,
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
Assert.assertEquals(jobState.getId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getJobName(), TEST_JOB_NAME_LOWER);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(jobState.getProp("foo"), "bar");
Assert.assertEquals(jobState.getProp("lower"), "case");
}
@Test(dependsOnMethods = "testGetJobState")
public void testPersistDatasetState() throws IOException {
JobState.DatasetState datasetState = new JobState.DatasetState(TEST_JOB_NAME, TEST_JOB_ID);
datasetState.setDatasetUrn(TEST_DATASET_URN);
datasetState.setState(JobState.RunningState.COMMITTED);
datasetState.setId(TEST_DATASET_URN);
datasetState.setStartTime(this.startTime);
datasetState.setEndTime(this.startTime + 1000);
datasetState.setDuration(1000);
for (int i = 0; i < 3; i++) {
TaskState taskState = new TaskState();
taskState.setJobId(TEST_JOB_ID);
taskState.setTaskId(TEST_TASK_ID_PREFIX + i);
taskState.setId(TEST_TASK_ID_PREFIX + i);
taskState.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
datasetState.addTaskState(taskState);
}
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN, datasetState);
// persist a second dataset state to test that retrieval of multiple dataset states works
datasetState.setDatasetUrn(TEST_DATASET_URN2);
datasetState.setId(TEST_DATASET_URN2);
datasetState.setDuration(2000);
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
// persist a colliding lowercase dataset state to test that retrieval is case sensitive
datasetState.setDatasetUrn(TEST_DATASET_URN_LOWER);
datasetState.setId(TEST_DATASET_URN_LOWER);
datasetState.setDuration(3000);
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN_LOWER, datasetState);
// second job name for testing getting store names in a later test case
datasetState.setJobName(TEST_JOB_NAME2);
dbDatasetStateStore.persistDatasetState(TEST_DATASET_URN2, datasetState);
}
@Test(dependsOnMethods = "testPersistDatasetState")
public void testGetDatasetState() throws IOException {
JobState.DatasetState datasetState =
dbDatasetStateStore.getLatestDatasetState(TEST_JOB_NAME, TEST_DATASET_URN);
Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 1000);
Assert.assertEquals(datasetState.getCompletedTasks(), 3);
for (int i = 0; i < datasetState.getCompletedTasks(); i++) {
TaskState taskState = datasetState.getTaskStates().get(i);
Assert.assertEquals(taskState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(taskState.getTaskId(), TEST_TASK_ID_PREFIX + i);
Assert.assertEquals(taskState.getId(), TEST_TASK_ID_PREFIX + i);
Assert.assertEquals(taskState.getWorkingState(), WorkUnitState.WorkingState.COMMITTED);
}
}
@Test(dependsOnMethods = { "testGetDatasetState" })
public void testGetStoreNames() throws IOException {
List<String> storeNames = this.dbJobStateStore.getStoreNames(Predicates.alwaysTrue());
Collections.sort(storeNames);
Assert.assertTrue(storeNames.size() == 3);
Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
Assert.assertEquals(storeNames.get(2), TEST_JOB_NAME_LOWER);
storeNames = this.dbDatasetStateStore.getStoreNames(Predicates.alwaysTrue());
Collections.sort(storeNames);
Assert.assertTrue(storeNames.size() == 2);
Assert.assertEquals(storeNames.get(0), TEST_JOB_NAME);
Assert.assertEquals(storeNames.get(1), TEST_JOB_NAME2);
}
@Test(dependsOnMethods = "testGetStoreNames")
public void testGetPreviousDatasetStatesByUrns() throws IOException {
Map<String, JobState.DatasetState> datasetStatesByUrns =
dbDatasetStateStore.getLatestDatasetStatesByUrns(TEST_JOB_NAME);
Assert.assertEquals(datasetStatesByUrns.size(), 3);
JobState.DatasetState datasetState = datasetStatesByUrns.get(TEST_DATASET_URN);
Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 1000);
datasetState = datasetStatesByUrns.get(TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN2);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 2000);
datasetState = datasetStatesByUrns.get(TEST_DATASET_URN_LOWER);
Assert.assertEquals(datasetState.getId(), TEST_DATASET_URN_LOWER);
Assert.assertEquals(datasetState.getDatasetUrn(), TEST_DATASET_URN_LOWER);
Assert.assertEquals(datasetState.getJobName(), TEST_JOB_NAME);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
Assert.assertEquals(datasetState.getState(), JobState.RunningState.COMMITTED);
Assert.assertEquals(datasetState.getStartTime(), this.startTime);
Assert.assertEquals(datasetState.getEndTime(), this.startTime + 1000);
Assert.assertEquals(datasetState.getDuration(), 3000);
}
@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
public void testDeleteJobState() throws IOException {
JobState jobState = dbJobStateStore.get(TEST_JOB_NAME,
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
Assert.assertNotNull(jobState);
Assert.assertEquals(jobState.getJobId(), TEST_JOB_ID);
dbJobStateStore.delete(TEST_JOB_NAME);
jobState = dbJobStateStore.get(TEST_JOB_NAME,
dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX,
TEST_JOB_ID);
Assert.assertNull(jobState);
}
@Test(dependsOnMethods = "testGetPreviousDatasetStatesByUrns")
public void testDeleteDatasetJobState() throws IOException {
JobState.DatasetState datasetState = dbDatasetStateStore.get(TEST_JOB_NAME,
TEST_DATASET_URN + "-" + dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, TEST_DATASET_URN);
Assert.assertNotNull(datasetState);
Assert.assertEquals(datasetState.getJobId(), TEST_JOB_ID);
dbDatasetStateStore.delete(TEST_JOB_NAME);
datasetState = dbDatasetStateStore.get(TEST_JOB_NAME,
TEST_DATASET_URN + "-" + dbDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX +
dbDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX, TEST_DATASET_URN);
Assert.assertNull(datasetState);
}
@AfterClass
public void tearDown() throws IOException {
dbJobStateStore.delete(TEST_JOB_NAME);
dbDatasetStateStore.delete(TEST_JOB_NAME);
}
}