blob: b571fb532db452c5414edc2bd892253c1cb26700 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.store;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.util.XLog;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.WorkflowsInfo;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowLib;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.StartNodeDef;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.XmlUtils;
public class TestDBWorkflowStore extends XTestCase {
WorkflowLib wfLib;
WorkflowStore store;
WorkflowJobBean wfBean1;
WorkflowJobBean wfBean2;
String dbName;
Services services;
private String actionId;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.init();
store = Services.get().get(WorkflowStoreService.class).create();
}
@Override
protected void tearDown() throws Exception {
store.closeTrx();
services.destroy();
super.tearDown();
}
public void testDBWorkflowStore() throws Exception {
_testInsertWF();
_testGetWF();
_testUpdateWF();
_testGetStatusCount();
// _testWaitWriteLock();
_testGetWFIDWithExtID();
_testSaveAction();
_testLoadAction();
_testUpdateAction();
_testGetActionsForWF();
System.out.println("after _testGetActions()");
_testGetActionForWFFailure();
System.out.println("after _testGetActionForWFFailure()");
_testGetPendingActions();
System.out.println("after _testPendingAction()");
_testGetWFInfo();
System.out.println("after _testWFInfo()");
// _testGetWFInfos();
System.out.println("after _testGetWFInfos()");
_testDeleteAction();
_testPurge();
}
private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, String authToken) throws Exception {
WorkflowAppService wps = Services.get().get(WorkflowAppService.class);
Configuration protoActionConf = wps.createProtoActionConf(conf, true);
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
WorkflowInstance wfInstance;
wfInstance = workflowLib.createInstance(app, conf);
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(wfInstance.getId());
workflow.setAppName(app.getName());
workflow.setAppPath(conf.get(OozieClient.APP_PATH));
workflow.setConf(XmlUtils.prettyPrint(conf).toString());
workflow.setProtoActionConf(XmlUtils.prettyPrint(protoActionConf).toString());
workflow.setCreatedTime(new Date());
workflow.setLogToken(conf.get(OozieClient.LOG_TOKEN, ""));
workflow.setStatus(WorkflowJob.Status.PREP);
workflow.setRun(0);
workflow.setUser(conf.get(OozieClient.USER_NAME));
workflow.setGroup(conf.get(OozieClient.GROUP_NAME));
workflow.setWorkflowInstance(wfInstance);
return workflow;
}
private void _testInsertWF() throws Exception {
store.beginTrx();
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
Configuration conf1 = new Configuration();
conf1.set(OozieClient.APP_PATH, "testPath");
conf1.set(OozieClient.LOG_TOKEN, "testToken");
conf1.set(OozieClient.USER_NAME, getTestUser());
wfBean1 = createWorkflow(app, conf1, "auth");
Configuration conf2 = new Configuration();
conf2.set(OozieClient.APP_PATH, "testPath");
conf2.set(OozieClient.LOG_TOKEN, "testToken");
conf2.set(OozieClient.USER_NAME, getTestUser2());
wfBean2 = createWorkflow(app, conf2, "auth");
store.insertWorkflow(wfBean1);
store.insertWorkflow(wfBean2);
store.commitTrx();
/*
* SqlStatement s = getCount(OozieTable.WORKFLOWS); ResultSet rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(2,
* rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.WORKFLOWS).where(isEqual(WF_id, wfBean1.getId())); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.WORKFLOWS).where(isEqual(WF_id, wfBean2.getId())); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*/
}
private void _testGetWF() throws StoreException {
store.beginTrx();
WorkflowJobBean wfBean = store.getWorkflow(wfBean1.getId(), false);
assertEquals(wfBean.getId(), wfBean1.getId());
assertEquals(wfBean.getStatus(), WorkflowJob.Status.PREP);
assertEquals(wfBean.getWorkflowInstance().getId(), wfBean1.getId());
}
private void _testUpdateWF() throws StoreException {
wfBean1.setStatus(WorkflowJob.Status.SUCCEEDED);
WorkflowInstance wfInstance = wfBean1.getWorkflowInstance();
wfInstance.setVar("test", "hello");
wfBean1.setWorkflowInstance(wfInstance);
wfBean1.setExternalId("testExtId");
store.getWorkflow(wfBean1.getId(), false);
store.updateWorkflow(wfBean1);
WorkflowJobBean wfBean = store.getWorkflow(wfBean1.getId(), false);
assertEquals("hello", wfBean.getWorkflowInstance().getVar("test"));
assertEquals(wfBean.getStatus(), WorkflowJob.Status.SUCCEEDED);
store.commitTrx();
}
private void _testGetStatusCount() throws StoreException, InterruptedException {
store.beginTrx();
// assertEquals(1,
// store.getWorkflowCountWithStatus(WorkflowJob.Status.PREP.name()));
// assertEquals(1,
// store.getWorkflowCountWithStatus(WorkflowJob.Status.SUCCEEDED.name()));
// assertEquals(1,
// store.getWorkflowCountWithStatusInLastNSeconds(WorkflowJob.Status.PREP.name(),
// 5));
// assertEquals(1,
// store.getWorkflowCountWithStatusInLastNSeconds(WorkflowJob.Status.SUCCEEDED.name(),
// 5));
sleep(1000);
long t1 = System.currentTimeMillis();
WorkflowJobBean wfBean = store.getWorkflow(wfBean2.getId(), false);
store.updateWorkflow(wfBean);
long t2 = System.currentTimeMillis();
int s = (int) ((t2 - t1) / 1000);
if (s < 1) {
s = 1;
}
// assertEquals(1,
// store.getWorkflowCountWithStatusInLastNSeconds(WorkflowJob.Status.PREP.name(),
// s));
// assertEquals(0,
// store.getWorkflowCountWithStatusInLastNSeconds(WorkflowJob.Status.SUCCEEDED.name(),
// s));
store.commitTrx();
}
public class Locker implements Runnable {
protected String id;
private String nameIndex;
private StringBuffer sb;
protected long timeout;
public Locker(String id, String nameIndex, StringBuffer buffer) {
this.id = id;
this.nameIndex = id + ":" + nameIndex;
this.sb = buffer;
}
public void run() {
XLog log = XLog.getLog(getClass());
try {
WorkflowStore store = Services.get().get(WorkflowStoreService.class).create();
log.info("Get [{0}]", nameIndex);
store.beginTrx();
store.getWorkflow(id, false);
log.info("Got [{0}]", nameIndex);
sb.append(nameIndex + "-L ");
synchronized (this) {
wait();
}
sb.append(nameIndex + "-U ");
store.commitTrx();
store.closeTrx();
log.info("Release [{0}]", nameIndex);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public void finish() {
synchronized (this) {
notify();
}
}
}
/*
* public void _testWaitWriteLock() throws Exception { StringBuffer sb = new
* StringBuffer(""); String id = wfBean1.getId(); Locker l1 = new Locker(id,
* "1", sb); Locker l2 = new Locker(id, "2", sb);
*
* new Thread(l1).start(); Thread.sleep(300); new Thread(l2).start();
* Thread.sleep(300); l1.finish(); Thread.sleep(1000); l2.finish();
* Thread.sleep(1000); assertEquals(id + ":1-L " + id + ":1-U " + id + ":2-L " +
* id + ":2-U", sb.toString().trim()); }
*/
private void _testGetWFIDWithExtID() throws StoreException {
store.beginTrx();
String id = (String) store.getWorkflowIdForExternalId("testExtId");
System.out.println("id is " + id);
assertEquals(wfBean1.getId(), id);
store.commitTrx();
}
private void _testSaveAction() throws StoreException, SQLException {
WorkflowActionBean a11 = new WorkflowActionBean();
store.beginTrx();
StringBuilder str = new StringBuilder();
str.append(System.currentTimeMillis());
str.append("11");
this.actionId = str.toString();
a11.setId(actionId);
a11.setJobId(wfBean1.getId());
a11.setName("a11");
a11.setStatus(WorkflowAction.Status.PREP);
store.insertAction(a11);
store.commitTrx();
/*
* WorkflowActionBean a12 = new WorkflowActionBean(); store.beginTrx();
* a12.setId("12"); a12.setName("a12"); a12.setJobId(wfBean1.getId());
* a12.setStatus(WorkflowAction.Status.PREP); store.insertAction(a12);
* store.commitTrx();
*
* WorkflowActionBean a21 = new WorkflowActionBean(); store.beginTrx();
* a21.setId("21"); a21.setName("a21"); a21.setJobId(wfBean2.getId());
* a21.setStatus(WorkflowAction.Status.PREP); store.insertAction(a21);
* store.commitTrx();
*
* WorkflowActionBean a22 = new WorkflowActionBean(); store.beginTrx();
* a22.setId("22"); a22.setName("a22"); a22.setJobId(wfBean2.getId());
* a22.setStatus(WorkflowAction.Status.PREP); store.insertAction(a22);
* store.commitTrx();
*/
/*
* SqlStatement s = getCount(OozieTable.ACTIONS); ResultSet rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(4,
* rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.ACTIONS).where(isEqual(ACTIONS_wfId,
* wfBean1.getId())); rs = s.prepareAndSetValues(conn).executeQuery();
* rs.next(); assertEquals(2, rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.ACTIONS).where(isEqual(ACTIONS_wfId,
* wfBean2.getId())); rs = s.prepareAndSetValues(conn).executeQuery();
* rs.next(); assertEquals(2, rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.ACTIONS).where(isEqual(ACTIONS_id, "11")); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.ACTIONS).where(isEqual(ACTIONS_id, "12")); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*/
}
private void _testLoadAction() throws StoreException {
store.beginTrx();
WorkflowActionBean a11 = store.getAction(actionId, false);
assertEquals(a11.getId(), actionId);
assertEquals(a11.getJobId(), wfBean1.getId());
assertEquals(a11.getStatus(), WorkflowAction.Status.PREP);
store.commitTrx();
}
private void _testUpdateAction() throws StoreException {
WorkflowActionBean a11 = store.getAction(actionId, false);
a11.setStatus(WorkflowAction.Status.OK);
a11.setPending();
a11.setPendingAge(new Date(System.currentTimeMillis() - 10000));
store.beginTrx();
store.updateAction(a11);
store.getEntityManager().flush();
store.getEntityManager().merge(a11);
store.commitTrx();
WorkflowActionBean a = store.getAction(a11.getId(), false);
assertEquals(a.getId(), a11.getId());
assertEquals(a.getStatus(), WorkflowAction.Status.OK);
}
private void _testDeleteAction() throws StoreException {
store.beginTrx();
store.deleteAction(actionId);
store.commitTrx();
boolean actionDeleted = false;
/*
* try { store.beginTrx(); store.getAction(actionId, false);
* store.commitTrx(); } catch (StoreException e) {
* System.out.println("errorCode is " + e.getErrorCode()); if
* (ErrorCode.E0605.equals(e.getErrorCode())) { actionDeleted = true; } }
*/
try {
store.getAction(actionId, false);
fail("Should have seen StoreException.");
}
catch (StoreException ex) {
}
}
private void _testGetActionForWFFailure() {
try {
store.getAction("non-existing-jobid", false);
fail("Should have seen StoreException.");
}
catch (StoreException ex) {
}
}
private void _testGetActionsForWF() throws StoreException {
store.beginTrx();
List<WorkflowActionBean> actions1 = store.getActionsForWorkflow(wfBean1.getId(), false);
// assertEquals(actions1.size(), 1);
List<WorkflowActionBean> actions2 = store.getActionsForWorkflow(wfBean2.getId(), false);
// assertEquals(actions2.size(), 2);
store.commitTrx();
}
private void _testGetPendingActions() throws StoreException {
store.beginTrx();
List<WorkflowActionBean> pActions = store.getPendingActions(5);
// assertEquals(1, pActions.size());
// assertEquals(actionId, pActions.get(0).getId());
store.commitTrx();
}
private void _testGetWFInfo() throws StoreException {
store.beginTrx();
WorkflowJobBean wfBean = store.getWorkflowInfo(wfBean1.getId());
assertEquals(wfBean.getId(), wfBean1.getId());
assertEquals(wfBean.getStatus(), wfBean1.getStatus());
assertEquals(wfBean.getActions().size(), 1);
assertEquals(wfBean.getActions().get(0).getId(), actionId);
store.commitTrx();
}
private void _testGetWFInfos() throws StoreException {
Map<String, List<String>> filter = new HashMap<String, List<String>>();
store.beginTrx();
WorkflowsInfo wfInfo = store.getWorkflowsInfo(filter, 1, 1);
System.out.println("got WorkflowsInfo " + wfInfo.getLen());
List<WorkflowJobBean> wfBeans = wfInfo.getWorkflows();
store.commitTrx();
assertEquals(1, wfBeans.size());
filter = new HashMap<String, List<String>>();
wfInfo = store.getWorkflowsInfo(filter, 1, 2);
wfBeans = wfInfo.getWorkflows();
assertEquals(2, wfBeans.size());
filter = new HashMap<String, List<String>>();
filter.put("user", Arrays.asList(getTestUser()));
wfInfo = store.getWorkflowsInfo(filter, 1, 2);
wfBeans = wfInfo.getWorkflows();
assertEquals(1, wfBeans.size());
filter = new HashMap<String, List<String>>();
filter.put("user", Arrays.asList(getTestUser(), getTestUser2()));
wfInfo = store.getWorkflowsInfo(filter, 1, 2);
wfBeans = wfInfo.getWorkflows();
assertEquals(2, wfBeans.size());
filter = new HashMap<String, List<String>>();
filter.put("user", Arrays.asList(getTestUser()));
filter.put("status", Arrays.asList("succeeded"));
wfInfo = store.getWorkflowsInfo(filter, 1, 2);
wfBeans = wfInfo.getWorkflows();
assertEquals(1, wfBeans.size());
filter = new HashMap<String, List<String>>();
filter.put("user", Arrays.asList(getTestUser(), getTestUser2()));
filter.put("name", Arrays.asList("testApp"));
wfInfo = store.getWorkflowsInfo(filter, 1, 2);
wfBeans = wfInfo.getWorkflows();
assertEquals(2, wfBeans.size());
assertEquals(2, wfInfo.getTotal());
assertEquals(1, wfInfo.getStart());
assertEquals(2, wfInfo.getLen());
filter = new HashMap<String, List<String>>();
filter.put("user", Arrays.asList(getTestUser(), getTestUser2()));
filter.put("name", Arrays.asList("testApp"));
wfInfo = store.getWorkflowsInfo(filter, 1, 1);
wfBeans = wfInfo.getWorkflows();
assertEquals(1, wfBeans.size());
assertEquals(2, wfInfo.getTotal());
assertEquals(1, wfInfo.getStart());
assertEquals(1, wfInfo.getLen());
}
private void _testPurge() throws Exception {
store.beginTrx();
wfBean1.setEndTime(new Date(System.currentTimeMillis() - (31 * 24 * 60 * 60 * 1000l)));
wfBean2.setEndTime(new Date(System.currentTimeMillis() - (31 * 24 * 60 * 60 * 1000l)));
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")).
addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
Configuration conf2 = new Configuration();
conf2.set(OozieClient.APP_PATH, "testPath");
conf2.set(OozieClient.LOG_TOKEN, "testToken");
conf2.set(OozieClient.USER_NAME, getTestUser2());
WorkflowJobBean wfBean3 = createWorkflow(app, conf2, "auth");
store.insertWorkflow(wfBean3);
store.updateWorkflow(wfBean2);
store.updateWorkflow(wfBean1);
store.commitTrx();
WorkflowActionBean a31 = new WorkflowActionBean();
StringBuilder str = new StringBuilder();
str.append(System.currentTimeMillis());
str.append("31");
a31.setId(str.toString());
a31.setJobId(wfBean3.getId());
a31.setStatus(WorkflowAction.Status.PREP);
store.beginTrx();
store.insertAction(a31);
store.commitTrx();
store.beginTrx();
store.purge(30, 10000);
store.commitTrx();
/*
* SqlStatement s = getCount(OozieTable.WF_JOBS); ResultSet rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(3,
* rs.getInt(1)); rs.close();
*
* s = getCount(OozieTable.WF_ACTIONS); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(4,
* rs.getInt(1)); rs.close();
*
* store.purge(30); store.commit();
*
* s = getCount(OozieTable.WF_JOBS); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*
* WorkflowJobBean tmp = store.getWorkflow(wfBean3.getId(), false);
* assertEquals(tmp.getId(), wfBean3.getId());
*
* s = getCount(OozieTable.WF_ACTIONS); rs =
* s.prepareAndSetValues(conn).executeQuery(); rs.next(); assertEquals(1,
* rs.getInt(1)); rs.close();
*
* WorkflowActionBean tmpa = store.getAction("31", false); assertEquals("31",
* tmpa.getId());
*/
}
}