blob: 7a47303d9fff19614c050675c8ff1d2531621d89 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
public class TestLocalOozieClientCoord extends XDataTestCase {
private Services services;
@Override
protected void setUp() throws Exception {
super.setUp();
/*
* DB cleanup below is needed to clean up the job records possibly left by previously executed tests.
* For example, such records are left by test org.apache.oozie.executor.jpa.TestCoordActionUpdateJPAExecutor.
* Note that by default Oozie tests are executed in "filesystem" (in fact, arbitrary) order. This way problems caused
* by left records can be flaky (not reproduced constantly).
* Services re-init is needed for the DB cleanup.
*/
services = new Services();
services.init();
LocalOozie.start();
}
@Override
protected void tearDown() throws Exception {
LocalOozie.stop();
services.destroy();
super.tearDown();
}
public void testGetOozieUrl() {
OozieClient client = LocalOozie.getCoordClient();
assertEquals("localoozie", client.getOozieUrl());
}
public void testGetProtocolUrl() throws IOException, OozieClientException {
OozieClient client = LocalOozie.getCoordClient();
assertEquals("localoozie", client.getProtocolUrl());
}
public void testValidateWSVersion() throws IOException, OozieClientException {
OozieClient client = LocalOozie.getCoordClient();
client.validateWSVersion();
}
public void testHeaderMethods() {
OozieClient client = LocalOozie.getCoordClient();
client.setHeader("h", "v");
assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h")));
Iterator<String> hit = client.getHeaderNames();
assertFalse(hit.hasNext());
try {
hit.next();
fail("NoSuchElementException expected.");
}
catch (NoSuchElementException nsee) {
// expected
}
client.removeHeader("h");
assertTrue("no-op, should be null/empty", StringUtils.isBlank(client.getHeader("h")));
}
public void testGetJobsInfo() {
OozieClient client = LocalOozie.getCoordClient();
try {
client.getJobsInfo("foo");
fail("OozieClientException expected.");
}
catch (OozieClientException oce) {
assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
}
try {
client.getJobsInfo("foo", 0, 5);
fail("OozieClientException expected.");
}
catch (OozieClientException oce) {
assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
}
try {
client.getJobInfo("foo-id");
fail("OozieClientException expected.");
}
catch (OozieClientException oce) {
assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
}
}
public void testReRun2() {
OozieClient client = LocalOozie.getCoordClient();
try {
client.reRun("foo-id", client.createConfiguration());
fail("OozieClientException expected.");
}
catch (OozieClientException oce) {
assertEquals(ErrorCode.E0301.toString(), oce.getErrorCode());
}
}
private void writeToFile(String appXml, String appPath) throws Exception {
File wf = new File(new URI(appPath).getPath());
PrintWriter out = null;
try {
out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(wf), StandardCharsets.UTF_8));
out.println(appXml);
}
catch (IOException iex) {
throw iex;
}
finally {
if (out != null) {
out.close();
}
}
}
public void testJobMethods() throws Exception {
final OozieClient client = LocalOozie.getCoordClient();
// Just in case, check that there are no Coord job records left by previous tests:
List<CoordinatorJob> list0 = client.getCoordJobsInfo("", 1, 100);
assertEquals(0, list0.size());
Properties conf = client.createConfiguration();
String appPath = getTestCaseFileUri("coordinator.xml");
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(20)}\" "
+ "start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <timeout>10</timeout> <concurrency>1</concurrency> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:minutes(20)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> </dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:minutes(20)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> </dataset> "
+ "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> " + "</input-events> "
+ "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
+ "<app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPath);
conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath);
String jobId0 = client.submit(conf);
client.kill(jobId0);
String jobId = client.run(conf);
client.suspend(jobId);
client.resume(jobId);
client.kill(jobId);
CoordinatorJob job = client.getCoordJobInfo(jobId);
String appName = job.getAppName();
assertEquals("NAME", appName);
List<CoordinatorJob> list = client.getCoordJobsInfo("", 1, 5);
assertEquals(2, list.size());
}
public void testJobsOperations() throws Exception {
final OozieClient client = LocalOozie.getCoordClient();
// Just in case, check that there are no Coord job records left by previous tests:
List<CoordinatorJob> list0 = client.getCoordJobsInfo("", 1, 100);
assertEquals(0, list0.size());
Properties conf = client.createConfiguration();
String appPath = storedCoordAppPath();
conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath);
final String jobId0 = client.run(conf);
final String jobId1 = client.run(conf);
final String jobId2 = client.run(conf);
waitFor(client, jobId0);
waitFor(client, jobId1);
waitFor(client, jobId2);
list0 = client.getCoordJobsInfo("name=NAME", 1, 10);
assertEquals(3, list0.size());
JSONObject jsonObject = client.suspendJobs("name=NAME", "coord", 1, 3);
assertEquals(3, jsonObject.get("total"));
assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId0).getStatus());
assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId1).getStatus());
assertEquals(Job.Status.SUSPENDED, client.getCoordJobInfo(jobId2).getStatus());
jsonObject = client.resumeJobs("name=NAME", "coord", 1, 3);
assertEquals(3, jsonObject.get("total"));
assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId0).getStatus());
assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId1).getStatus());
assertEquals(Job.Status.RUNNING, client.getCoordJobInfo(jobId2).getStatus());
jsonObject = client.killJobs("name=NAME", "coord", 1, 3);
assertEquals(3, jsonObject.get("total"));
assertEquals(3, ((JSONArray) jsonObject.get("coordinatorjobs")).size());
assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId0).getStatus());
assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId1).getStatus());
assertEquals(Job.Status.KILLED, client.getCoordJobInfo(jobId2).getStatus());
}
private void waitFor(final OozieClient client, final String jobId) {
waitFor(10 * 1000, new Predicate() {
@Override
public boolean evaluate() throws Exception {
Job.Status status = client.getCoordJobInfo(jobId).getStatus();
return status != Job.Status.PREP;
}
});
}
private String storedCoordAppPath() throws Exception {
String appPath = getTestCaseFileUri("coordinator.xml");
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:minutes(20)}\" "
+ "start=\"2009-02-01T01:00Z\" end=\"2009-02-01T03:00Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.1\"> <controls> <timeout>10</timeout> <concurrency>1</concurrency> "
+ "<execution>LIFO</execution> </controls> "
+ " <action> <workflow> "
+ "<app-path>hdfs:///tmp/workflows/</app-path> "
+" </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPath);
return appPath;
}
}