blob: ecb2e3e26ff321093429f69189016c22a54e12da [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.servlet;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import javax.servlet.http.HttpServletResponse;
import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobsGetFromParentIdJPAExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.client.BundleJob;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.rest.JsonTags;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.service.AuthorizationService;
import org.apache.oozie.service.BundleEngineService;
import org.apache.oozie.service.ForTestAuthorizationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ProxyUserService;
import org.apache.oozie.service.Services;
import org.apache.oozie.servlet.AuthFilter;
import org.apache.oozie.servlet.HostnameFilter;
import org.apache.oozie.servlet.V1JobsServlet;
import org.apache.oozie.test.EmbeddedServletContainer;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
public class TestBulkMonitorWebServiceAPI extends XDataTestCase {
private EmbeddedServletContainer container;
private String servletPath;
Services services;
JPAService jpaService;
static {
new V1JobsServlet();
}
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.init();
LocalOozie.start();
jpaService = Services.get().get(JPAService.class);
addRecordsForBulkMonitor();
}
@Override
protected void tearDown() throws Exception {
LocalOozie.stop();
services.destroy();
super.tearDown();
}
private URL createURL(String servletPath, String resource, Map<String, String> parameters) throws Exception {
StringBuilder sb = new StringBuilder();
sb.append(container.getServletURL(servletPath));
if (resource != null && resource.length() > 0) {
sb.append("/").append(resource);
}
if (parameters.size() > 0) {
String separator = "?";
for (Map.Entry<String, String> param : parameters.entrySet()) {
sb.append(separator).append(URLEncoder.encode(param.getKey(), StandardCharsets.UTF_8.name())).append("=")
.append(URLEncoder.encode(param.getValue(), StandardCharsets.UTF_8.name()));
separator = "&";
}
}
return new URL(sb.toString());
}
private URL createURL(String resource, Map<String, String> parameters) throws Exception {
return createURL(servletPath, resource, parameters);
}
private void runTest(String servletPath, Class servletClass, boolean securityEnabled, Callable<Void> assertions)
throws Exception {
runTest(new String[] { servletPath }, new Class[] { servletClass }, securityEnabled, assertions);
}
private void runTest(String[] servletPath, Class[] servletClass, boolean securityEnabled, Callable<Void> assertions)
throws Exception {
Services services = new Services();
this.servletPath = servletPath[0];
try {
String proxyUser = getTestUser();
services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.HOSTS, "*");
services.getConf().set(ProxyUserService.CONF_PREFIX + proxyUser + ProxyUserService.GROUPS, "*");
services.init();
services.getConf().setBoolean(AuthorizationService.CONF_SECURITY_ENABLED, securityEnabled);
Services.get().setService(ForTestAuthorizationService.class);
Services.get().setService(BundleEngineService.class);
container = new EmbeddedServletContainer("oozie");
for (int i = 0; i < servletPath.length; i++) {
container.addServletEndpoint(servletPath[i], servletClass[i]);
}
container.addFilter("/*", HostnameFilter.class);
container.addFilter("/*", AuthFilter.class);
container.addFilter("/*", HttpResponseHeaderFilter.class);
setSystemProperty("user.name", getTestUser());
container.start();
assertions.call();
}
finally {
this.servletPath = null;
if (container != null) {
container.stop();
}
services.destroy();
container = null;
}
}
public void testSingleRecord() throws Exception {
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+ "actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z";
JSONArray array = _requestToServer(bulkRequest);
assertEquals(1, array.size());
JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), "BUNDLE-TEST");
assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1");
assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING");
assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(", "))[1],
DateUtils.parseDateUTC(CREATE_TIME).toGMTString());
return null;
}
});
}
public void testMultipleRecords() throws Exception {
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1;"
+ "actionStatus=FAILED,KILLED;startcreatedtime=2012-07-21T00:00Z";
JSONArray array = _requestToServer(bulkRequest);
assertEquals(2, array.size());
JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
assertNotNull(jbundle);
JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
assertNotNull(jaction1);
assertNotNull(jaction2);
assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_NAME), bundleName);
assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_STATUS), "KILLED");
return null;
}
});
}
public void testNoRecords() throws Exception {
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=BUNDLE-ABC";
Map<String, String> params = new HashMap<String, String>();
params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
params.put(RestConstants.OFFSET_PARAM, "1");
params.put(RestConstants.LEN_PARAM, "5");
URL url = createURL("", params);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// WS call will throw BAD_REQUEST code 400 error because no
// records found for this bundle
assertEquals(HttpServletResponse.SC_BAD_REQUEST, conn.getResponseCode());
return null;
}
});
}
public void testMultipleCoordinators() throws Exception {
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
// giving range as 2 of the total 3 coordinators
String bulkRequest = "bundle=" + bundleName + ";coordinators=Coord1,Coord2;actionstatus=KILLED";
JSONArray array = _requestToServer(bulkRequest);
assertEquals(2, array.size());
JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
assertNotNull(jbundle);
JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
assertNotNull(jaction1);
assertNotNull(jaction2);
assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), "Coord1@2");
assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), "Coord2@1");
return null;
}
});
}
public void testDefaultStatus() throws Exception {
// adding coordinator action #4 to Coord#3
CoordinatorActionBean action4 = new CoordinatorActionBean();
action4.setId("Coord3@1");
action4.setStatus(CoordinatorAction.Status.FAILED);
action4.setCreatedTime(DateUtils.parseDateUTC(CREATE_TIME));
action4.setJobId("Coord3");
Calendar cal = Calendar.getInstance();
cal.setTime(DateUtils.parseDateUTC(CREATE_TIME));
cal.add(Calendar.DATE, -1);
action4.setNominalTime(cal.getTime());
CoordActionInsertJPAExecutor actionInsert = new CoordActionInsertJPAExecutor(action4);
jpaService.execute(actionInsert);
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=" + bundleName;
JSONArray array = _requestToServer(bulkRequest);
assertEquals(4, array.size());
return null;
}
});
}
public void testMultipleBundleIdsForName() throws Exception {
// Adding another bundle having same name
BundleJobBean bundle = new BundleJobBean();
bundle.setId("00002-12345-B");
bundle.setAppName(bundleName);
bundle.setStatus(BundleJob.Status.RUNNING);
bundle.setStartTime(new Date());
BundleJobInsertJPAExecutor bundleInsert = new BundleJobInsertJPAExecutor(bundle);
jpaService.execute(bundleInsert);
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=" + bundleName;
Map<String, String> params = new HashMap<String, String>();
params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
params.put(RestConstants.OFFSET_PARAM, "1");
params.put(RestConstants.LEN_PARAM, "5");
URL url = createURL("", params);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
// WS call will throw BAD_REQUEST code 400 error because no
// records found for this bundle
assertFalse(HttpServletResponse.SC_BAD_REQUEST == conn.getResponseCode());
return null;
}
});
}
public void testBundleId() throws Exception {
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
String bulkRequest = "bundle=" + bundleId + ";coordinators=Coord1;"
+ "actionStatus=FAILED;startcreatedtime=2012-07-21T00:00Z";
JSONArray array = _requestToServer(bulkRequest);
assertEquals(1, array.size());
JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
JSONObject jcoord = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_COORDINATOR);
JSONObject jaction = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
assertNotNull(jbundle);
assertNotNull(jcoord);
assertNotNull(jaction);
assertEquals(jbundle.get(JsonTags.BUNDLE_JOB_ID), bundleId);
assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_NAME), "Coord1");
assertEquals(jcoord.get(JsonTags.COORDINATOR_JOB_STATUS), "RUNNING");
assertEquals(jaction.get(JsonTags.COORDINATOR_ACTION_STATUS), "FAILED");
assertEquals((jaction.get(JsonTags.COORDINATOR_ACTION_CREATED_TIME).toString().split(", "))[1],
DateUtils.parseDateUTC(CREATE_TIME).toGMTString());
return null;
}
});
}
public void testBundleIdWithCoordId() throws Exception {
// fetching coord Ids
JPAService jpaService = Services.get().get(JPAService.class);
List<String> coordIds = jpaService.execute(new CoordJobsGetFromParentIdJPAExecutor(bundleId, 10));
// there are 3 coordinators but giving range as only two of them
final String coordIdsStr = coordIds.get(0) + "," + coordIds.get(1);
runTest("/v1/jobs", V1JobsServlet.class, false, new Callable<Void>() {
public Void call() throws Exception {
// giving range as 2 of the total 3 coordinators
String bulkRequest = "bundle=" + bundleId + ";coordinators=" + coordIdsStr + ";actionstatus=KILLED";
JSONArray array = _requestToServer(bulkRequest);
assertEquals(2, array.size());
JSONObject jbundle = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_BUNDLE);
JSONObject jaction1 = (JSONObject) ((JSONObject) array.get(0)).get(JsonTags.BULK_RESPONSE_ACTION);
JSONObject jaction2 = (JSONObject) ((JSONObject) array.get(1)).get(JsonTags.BULK_RESPONSE_ACTION);
assertEquals(jaction1.get(JsonTags.COORDINATOR_ACTION_ID), "Coord1@2");
assertEquals(jaction2.get(JsonTags.COORDINATOR_ACTION_ID), "Coord2@1");
return null;
}
});
}
private JSONArray _requestToServer(String bulkRequest) throws Exception {
Map<String, String> params = new HashMap<String, String>();
params.put(RestConstants.JOBS_BULK_PARAM, bulkRequest);
params.put(RestConstants.OFFSET_PARAM, "1");
params.put(RestConstants.LEN_PARAM, "5");
URL url = createURL("", params);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("GET");
assertEquals(HttpServletResponse.SC_OK, conn.getResponseCode());
assertTrue(conn.getHeaderField("content-type").startsWith(RestConstants.JSON_CONTENT_TYPE));
JSONObject json = (JSONObject) JSONValue.parse(new InputStreamReader(conn.getInputStream(),
StandardCharsets.UTF_8));
JSONArray array = (JSONArray) json.get(JsonTags.BULK_RESPONSES);
return array;
}
}