blob: 86f63b789000da1f3d212aae524ff0a6552f210b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.coord;
import java.io.ByteArrayOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.log4j.Appender;
import org.apache.log4j.Layout;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.log4j.WriterAppender;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.HCatAccessorService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.RecoveryService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
private String server;
private Services services = null;
@Before
protected void setUp() throws Exception {
super.setUp();
services = super.setupServicesForHCatalog();
services.init();
server = getMetastoreAuthority();
}
@After
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
}
@Test
public void testUpdateCoordTableSingleDep() throws Exception {
// Test for single dependency which is already in the hcat server
String db = "default";
String table = "tablename";
String newHCatDependency = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
@Test
public void testUpdateCoordTableMultipleDepsV1() throws Exception {
// Test for two dependencies which are already in the hcat server
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120412;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
@Test
public void testUpdateCoordTableMultipleDepsV2() throws Exception {
// Test for two dependencies : one of them is already existing in the
// hcat server. Other one is not.
// Expected to see both action in WAITING as first one is not available and we only check for first.
// Later make the other partition also available. action is expected to
// be READY
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
// Checks only for first missing dependency
new CoordPushDependencyCheckXCommand(actionId).call();
// Checks dependencies in order. So list does not change if first one is not available
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
// Make first dependency available
dropPartition(db, table, "dt=20120430;country=usa");
addPartition(db, table, "dt=20120430;country=brazil");
new CoordPushDependencyCheckXCommand(actionId).call();
checkCoordAction(actionId, newHCatDependency2, CoordinatorAction.Status.WAITING);
addPartition(db, table, "dt=20120430;country=usa");
new CoordPushDependencyCheckXCommand(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
@Test
public void testUpdateCoordTableMultipleDepsV3() throws Exception {
// Test for two dependencies : one of them is already existing in the
// hcat server. Other one is not.
// Expected to see only first action in WAITING as we check for all dependencies.
// Later make the other partition also available. action is expected to
// be READY
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
// Checks for all missing dependencies
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
// Make first dependency available
addPartition(db, table, "dt=20120430;country=brazil");
new CoordPushDependencyCheckXCommand(actionId).call();
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
}
@Test
public void testResolveCoordConfiguration() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120412;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING, "coord-action-for-action-push-check.xml", null, newHCatDependency,
"Z");
String actionId = action.getId();
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId).call();
CoordinatorActionBean caBean = checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
Element eAction = XmlUtils.parseXml(caBean.getActionXml());
Element configElem = eAction.getChild("action", eAction.getNamespace())
.getChild("workflow", eAction.getNamespace()).getChild("configuration", eAction.getNamespace());
List<?> elementList = configElem.getChildren("property", configElem.getNamespace());
Element e1 = (Element) elementList.get(0);
Element e2 = (Element) elementList.get(1);
assertEquals(
"hcat://dummyhcat:1000/db1/table1/ds=/2009-29,hcat://dummyhcat:1000/db1/table1/ds=/2009-29," +
"hcat://dummyhcat:1000/db1/table1/ds=/2009-29",
e1.getChild("value", e1.getNamespace()).getValue());
assertEquals("hcat://dummyhcat:1000/db1/table1/ds=/2009-29", e2.getChild("value", e1.getNamespace()).getValue());
}
@Test
public void testTimeOut() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
// Timeout is 10 mins. Change action created time to before 12 min to make the action
// timeout.
long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
setCoordActionCreationTime(actionId, timeOutCreationTime);
new CoordPushDependencyCheckXCommand(actionId).call();
Thread.sleep(100);
// Check for timeout status and unregistered missing dependencies
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.TIMEDOUT);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
}
@Test
public void testTimeOutWithUnresolvedMissingDependencies() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency3 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=uk";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
// Timeout is 10 mins. Change action created time to before 12 min to make the action
// timeout.
long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
setCoordActionCreationTime(actionId, timeOutCreationTime);
// Set some missing dependency. Instead of latest or future just setting a current one for testing as
// we are only interested in ensuring CoordActionInputCheckXCommand is run
setMissingDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3);
addPartition(db, table, "dt=20120430;country=brazil");
checkDependencies(actionId, newHCatDependency + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency3,
newHCatDependency1);
new CoordPushDependencyCheckXCommand(actionId).call();
// Somehow with hive 0.10 it takes 1 second more.
Thread.sleep(1300);
checkDependencies(actionId, newHCatDependency3, "");
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
}
@Test
public void testTimeOutWithException1() throws Exception {
// Test timeout when missing dependencies are from a non existing table
String newHCatDependency1 = "hcat://" + server + "/nodb/notable/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/nodb/notable/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
try {
new CoordPushDependencyCheckXCommand(actionId, true).call();
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("NoSuchObjectException"));
}
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
// Timeout is 10 mins. Change action created time to before 12 min to make the action
// timeout.
long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
setCoordActionCreationTime(actionId, timeOutCreationTime);
try {
new CoordPushDependencyCheckXCommand(actionId).call();
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("NoSuchObjectException"));
}
Thread.sleep(100);
// Check for timeout status and unregistered missing dependencies
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.TIMEDOUT);
}
@Test
public void testTimeOutWithException2() throws Exception {
// Test timeout when table containing missing dependencies is dropped in between
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms = Services.get().get(PartitionDependencyManagerService.class);
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
assertTrue(pdms.getWaitingActions(new HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
// Timeout is 10 mins. Change action created time to before 12 min to make the action
// timeout.
long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 * 1000);
setCoordActionCreationTime(actionId, timeOutCreationTime);
dropTable(db, table, true);
try {
new CoordPushDependencyCheckXCommand(actionId).call();
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("NoSuchObjectException"));
}
Thread.sleep(100);
// Check for timeout status and unregistered missing dependencies
checkCoordAction(actionId, newHCatDependency1, CoordinatorAction.Status.TIMEDOUT);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
assertFalse(hcatService.isRegisteredForNotification(new HCatURI(newHCatDependency1)));
}
@Test
public void testRequeueOnException() throws Exception {
Services.get().getConf().setInt(RecoveryService.CONF_SERVICE_INTERVAL, 6000);
// Test timeout when table containing missing dependencies is dropped in between
String newHCatDependency1 = "hcat://" + server + "/nodb/notable/dt=20120430;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/nodb/notable/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING, "coord-action-for-action-input-check.xml", null,
newHCatDependency, "Z");
String actionId = action.getId();
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
try {
new CoordPushDependencyCheckXCommand(actionId, true).call();
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("NoSuchObjectException"));
}
// Nothing should be queued as there are no pull dependencies
CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class);
assertEquals(0, callableQueueService.getQueueDump().size());
// Nothing should be queued as there are no pull missing dependencies
// but only push missing deps are there
new CoordActionInputCheckXCommand(actionId, job.getId()).call();
callableQueueService = Services.get().get(CallableQueueService.class);
assertEquals(0, callableQueueService.getQueueDump().size());
setMissingDependencies(actionId, newHCatDependency1);
try {
new CoordPushDependencyCheckXCommand(actionId, true).call();
fail();
}
catch (Exception e) {
assertTrue(e.getMessage().contains("NoSuchObjectException"));
}
// Should be requeued at the recovery service interval
final List<String> queueDump = callableQueueService.getQueueDump();
assertEquals(1, callableQueueService.getQueueDump().size());
assertTrue(queueDump.get(0).contains("coord_push_dep_check"));
log.info("Queue dump is " + queueDump.toString());
// Delay should be something like delay=599999 (ignore last three digits) or delay=600000
assertTrue(queueDump.get(0).matches(".* delay=(599[0-9]{3}|600000)"));
}
@Test
public void testLogMessagePrefix() throws Exception {
Logger logger = Logger.getLogger(DependencyChecker.class);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Layout layout = new SimpleLayout();
Appender appender = new WriterAppender(layout, out);
logger.addAppender(appender);
String db = "default";
String table = "tablename";
populateTable(db, table);
String newHCatDependency = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=brazil";
String actionId1 = addInitRecords(newHCatDependency);
new CoordPushDependencyCheckXCommand(actionId1).call();
assertTrue(out.toString(StandardCharsets.UTF_8.name()).contains("ACTION[" + actionId1 + "]"));
out.reset();
String actionId2 = addInitRecords(newHCatDependency);
new CoordPushDependencyCheckXCommand(actionId2).call();
assertFalse(out.toString(StandardCharsets.UTF_8.name()).contains("ACTION[" + actionId1 + "]"));
assertTrue(out.toString(StandardCharsets.UTF_8.name()).contains("ACTION[" + actionId2 + "]"));
}
@Test
public void testExceptionOnInvalidElFunction() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120412;country=brazil";
String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + table + "/dt=20120430;country=usa";
String newHCatDependency = newHCatDependency1 + CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
populateTable(db, table);
CoordinatorJobBean job = addRecordToCoordJobTableForWaiting("coord-hcatinput-invalid-elfunction.xml",
CoordinatorJob.Status.RUNNING, false, true);
CoordinatorActionBean action = addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING, "coord-hcatinput-invalid-elfunction.xml", null, newHCatDependency,
"Z");
final String actionId = action.getId();
checkCoordAction(actionId, newHCatDependency, CoordinatorAction.Status.WAITING);
try {
new CoordPushDependencyCheckXCommand(actionId).call();
waitFor(6000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
CoordinatorActionBean action = CoordActionQueryExecutor.getInstance()
.get(CoordActionQueryExecutor.CoordActionQuery.GET_COORD_ACTION, actionId);
return action.getStatus() == CoordinatorAction.Status.FAILED;
}
});
fail("Should throw an exception");
}
catch (Exception e) {
assertTrue(e.getMessage().contains("Coord Action Input Check Error"));
}
}
private void populateTable(String db, String table) throws Exception {
dropTable(db, table, true);
dropDatabase(db, true);
createDatabase(db);
createTable(db, table, "dt,country");
addPartition(db, table, "dt=20120430;country=usa");
addPartition(db, table, "dt=20120412;country=brazil");
addPartition(db, table, "dt=20120413;country=brazil");
}
private CoordinatorActionBean checkCoordAction(String actionId, String expDeps, CoordinatorAction.Status stat)
throws Exception {
try {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
String missDeps = action.getPushMissingDependencies();
assertEquals(expDeps, missDeps);
assertEquals(stat, action.getStatus());
return action;
}
catch (JPAExecutorException se) {
throw new Exception("Action ID " + actionId + " was not stored properly in db");
}
}
private CoordinatorActionBean checkDependencies(String actionId, String expDeps, String expPushDeps)
throws Exception {
try {
JPAService jpaService = Services.get().get(JPAService.class);
CoordinatorActionBean action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
assertEquals(expDeps, action.getMissingDependencies());
assertEquals(expPushDeps, action.getPushMissingDependencies());
return action;
}
catch (JPAExecutorException se) {
throw new Exception("Action ID " + actionId + " was not stored properly in db");
}
}
}