blob: 87fc98c7bd1e3605e82b4e20ff120a9d78670f94 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.store;
import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorAction.Status;
import org.apache.oozie.service.CoordinatorStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
public class TestCoordinatorStore extends XTestCase {
Services services;
CoordinatorStore store;
CoordinatorJobBean coordBean;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
cleanUpDB(services.getConf());
services.init();
store = Services.get().get(CoordinatorStoreService.class).create();
}
@Override
protected void tearDown() throws Exception {
// dropSchema(dbName, conn);
services.destroy();
super.tearDown();
}
public void testCoordStore() throws StoreException {
String jobId = "00000-" + new Date().getTime() + "-TestCoordinatorStore-C";
String actionId = jobId + "_1";
try {
_testInsertJob(jobId);
_testGetJob(jobId);
_testGetMatJobLists();
_testUpdateCoordJob(jobId);
_testInsertAction(jobId, actionId);
_testGetAction(jobId, actionId);
_testGetActionForJob(jobId, actionId);
_testGetActionForJobInExecOrder(jobId, actionId);
_testGetActionForJobInLastOnly(jobId, actionId);
_testGetActionByExternalId(actionId, actionId + "_E");
_testGetActionRunningCount(actionId);
_testGetRecoveryActionsGroupByJobId(jobId);
_testUpdateCoordAction(actionId);
_testUpdateCoordActionMin(actionId);
}
finally {
// store.closeTrx();
}
}
private void _testUpdateCoordAction(String actionId) {
store.beginTrx();
try {
CoordinatorActionBean action = store.getCoordinatorAction(actionId, true);
int newActNum = action.getActionNumber() + 1;
action.setActionNumber(newActNum);
store.updateCoordinatorAction(action);
store.getEntityManager().flush();
store.getEntityManager().merge(action);
action = store.getCoordinatorAction(actionId, false);
assertEquals(newActNum, action.getActionNumber());
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to Update a record in Coord Action. actionId =" + actionId);
}
}
private void _testUpdateCoordActionMin(String actionId) {
store.beginTrx();
try {
CoordinatorActionBean action = store.getCoordinatorAction(actionId, true);
action.setStatus(CoordinatorAction.Status.SUCCEEDED);
action.setMissingDependencies("d1,d2,d3");
action.setActionNumber(777);
Date lastModifiedTime = new Date();
action.setLastModifiedTime(lastModifiedTime);
store.updateCoordActionMin(action);
store.commitTrx();
//store.getEntityManager().merge(action);
action = getCoordAction(actionId);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
assertEquals("d1,d2,d3", action.getMissingDependencies());
//assertEquals(lastModifiedTime, action.getLastModifiedTime());
if (action.getActionNumber() == 777) {
fail("Action number should not be updated");
}
}
catch (Exception ex) {
if (store.isActive()) {
store.rollbackTrx();
}
ex.printStackTrace();
fail("Unable to Update a record in Coord Action. actionId =" + actionId);
}
}
private void _testGetActionRunningCount(String actionId) {
store.beginTrx();
try {
int count = store.getCoordinatorRunningActionsCount(actionId);
assertEquals(count, 0);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET count for action ID. actionId =" + actionId);
}
}
private void _testGetActionByExternalId(String actionId, String extId) {
store.beginTrx();
try {
CoordinatorActionBean action = store.getCoordinatorActionForExternalId(extId);
assertEquals(action.getId(), actionId);
assertEquals(action.getExternalId(), extId);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD ActionBy External ID. actionId =" + actionId + " extID =" + extId);
}
}
private void _testGetActionForJobInExecOrder(String jobId, String actionId) {
store.beginTrx();
try {
List<CoordinatorActionBean> actionList = store.getCoordinatorActionsForJob(jobId, 1,
CoordinatorJob.Execution.FIFO.toString());
assertEquals(actionList.size(), 1);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD Action_FOR_JOB with Exec Order. actionId =" + actionId + " jobId ="
+ jobId);
}
}
private void _testGetActionForJobInLastOnly(String jobId, String actionId) {
store.beginTrx();
try {
List<CoordinatorActionBean> actionList = store.getCoordinatorActionsForJob(jobId, 3,
CoordinatorJob.Execution.LAST_ONLY.toString());
assertEquals(actionList.size(), 1);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD Action_FOR_JOB with Exec Order. actionId =" + actionId + " jobId ="
+ jobId);
}
}
private void _testGetActionForJob(String jobId, String actionId) {
store.beginTrx();
try {
List<CoordinatorActionBean> actionList = store.getActionsForCoordinatorJob(jobId, false);
assertEquals(actionList.size(), 1);
CoordinatorActionBean action = actionList.get(0);
assertEquals(jobId, action.getJobId());
assertEquals(actionId, action.getId());
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
assertEquals(action.getActionNumber(), 1);
assertEquals(action.getExternalId(), actionId + "_E");
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD Action_FOR_JOB. actionId =" + actionId + " jobId =" + jobId);
}
}
private void _testGetAction(String jobId, String actionId) throws StoreException {
store.beginTrx();
try {
CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
assertEquals(jobId, action.getJobId());
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
assertEquals(action.getActionNumber(), 1);
assertEquals(action.getExternalId(), actionId + "_E");
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD Action. actionId =" + actionId);
}
}
private void _testGetRecoveryActionsGroupByJobId(String jobId) throws StoreException {
store.beginTrx();
try {
List<String> jobids = store.getRecoveryActionsGroupByJobId(60);
assertNotNull(jobids);
assertEquals(jobId, jobids.get(0));
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for RecoveryActionsGroupByJobId. jobId =" + jobId);
}
}
private void _testInsertAction(String jobId, String actionId) {
CoordinatorActionBean action = createAction(jobId, actionId);
}
private CoordinatorActionBean createAction(String jobId, String actionId) {
CoordinatorActionBean action = new CoordinatorActionBean();
action.setJobId(jobId);
action.setId(actionId);
action.setActionNumber(1);
action.setNominalTime(new Date());
action.setStatus(Status.READY);
action.setExternalId(actionId + "_E");
action.setLastModifiedTime(new Date(new Date().getTime() - 1200000));
store.beginTrx();
try {
store.insertCoordinatorAction(action);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to insert a record into COORD Action ");
}
return action;
}
private void _testUpdateCoordJob(String jobId) {
store.beginTrx();
try {
CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
int newFreq = job.getFrequency() + 1;
job.setFrequency(newFreq);
store.updateCoordinatorJob(job);
store.getEntityManager().flush();
store.getEntityManager().merge(job);
job = store.getCoordinatorJob(jobId, false);
assertEquals(newFreq, job.getFrequency());
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to UPDATE a record for COORD Job. jobId =" + jobId);
}
}
private void _testGetMatJobLists() throws StoreException {
store.beginTrx();
try {
Date d1 = new Date();
Date d2 = new Date(d1.getTime() + 1000);
List<CoordinatorJobBean> jobList = store.getCoordinatorJobsToBeMaterialized(d2, 50);
if (jobList.size() == 0) {
fail("Test of getCoordinatorJobsToBeMaterialized returned no records. Date =" + d2);
}
// Assumption: no other older records are there
d2 = new Date(d1.getTime() - 86400000L * 365L);
jobList = store.getCoordinatorJobsToBeMaterialized(d2, 50);
/*
* if(jobList.size() > 0){ fail("Test of
* getCoordinatorJobsToBeMaterialized returned some records while
* expecting no records = " + d2); }
*/
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to Get Materialized Jobs ");
}
}
private void _testGetJob(String jobId) throws StoreException {
store.beginTrx();
try {
CoordinatorJobBean job = store.getCoordinatorJob(jobId, false);
assertEquals(jobId, job.getId());
assertEquals(job.getStatus(), CoordinatorJob.Status.PREP);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to GET a record for COORD Job. jobId =" + jobId);
}
}
private void _testInsertJob(String jobId) throws StoreException {
CoordinatorJobBean job = createCoordJob(jobId);
store.beginTrx();
try {
store.insertCoordinatorJob(job);
store.commitTrx();
}
catch (Exception ex) {
store.rollbackTrx();
ex.printStackTrace();
fail("Unable to insert a record into COORD Job ");
}
}
private CoordinatorJobBean createCoordJob(String jobId) {
CoordinatorJobBean coordJob = new CoordinatorJobBean();
coordJob.setId(jobId);
coordJob.setAppName("testApp");
coordJob.setAppPath("testAppPath");
coordJob.setStatus(CoordinatorJob.Status.PREP);
coordJob.setCreatedTime(new Date());
coordJob.setUser("testUser");
coordJob.setGroup("testGroup");
String confStr = "<configuration></configuration>";
coordJob.setConf(confStr);
String appXml = "<coordinator-app xmlns='uri:oozie:coordinator:0.1' name='NAME' frequency=\"1\" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
appXml += "<controls>";
appXml += "<timeout>10</timeout>";
appXml += "<concurrency>2</concurrency>";
appXml += "<execution>LIFO</execution>";
appXml += "</controls>";
appXml += "<input-events>";
appXml += "<data-in name='A' dataset='a'>";
appXml += "<dataset name='a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
appXml += "</dataset>";
appXml += "<instance>${coord:latest(0)}</instance>";
appXml += "</data-in>";
appXml += "</input-events>";
appXml += "<output-events>";
appXml += "<data-out name='LOCAL_A' dataset='local_a'>";
appXml += "<dataset name='local_a' frequency='7' initial-instance='2009-02-01T01:00Z' timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>";
appXml += "<uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>";
appXml += "</dataset>";
appXml += "<instance>${coord:current(-1)}</instance>";
appXml += "</data-out>";
appXml += "</output-events>";
appXml += "<action>";
appXml += "<workflow>";
appXml += "<app-path>hdfs:///tmp/workflows/</app-path>";
appXml += "<configuration>";
appXml += "<property>";
appXml += "<name>inputA</name>";
appXml += "<value>${coord:dataIn('A')}</value>";
appXml += "</property>";
appXml += "<property>";
appXml += "<name>inputB</name>";
appXml += "<value>${coord:dataOut('LOCAL_A')}</value>";
appXml += "</property>";
appXml += "</configuration>";
appXml += "</workflow>";
appXml += "</action>";
appXml += "</coordinator-app>";
coordJob.setJobXml(appXml);
coordJob.setLastActionNumber(0);
coordJob.setFrequency(1);
Date curr = new Date();
coordJob.setNextMaterializedTime(curr);
coordJob.setLastModifiedTime(curr);
coordJob.setEndTime(new Date(curr.getTime() + 86400000));
coordJob.setStartTime(new Date(curr.getTime() - 86400000));
return coordJob;
}
/**
* Helper methods
*
* @param jobId
* @throws StoreException
*/
private CoordinatorActionBean getCoordAction(String actionId) throws StoreException {
CoordinatorStore store = new CoordinatorStore(false);
try {
CoordinatorActionBean action = store.getCoordinatorAction(actionId, false);
return action;
}
catch (StoreException se) {
fail("Job ID " + actionId + " was not stored properly in db");
}finally {
store.closeTrx();
}
return null;
}
}