blob: d6f1aef94ffda18f90b2454415fef03dacb3c328 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.XLogStreamingService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogStreamer;
public class TestCoordinatorEngineStreamLog extends XDataTestCase {
private Services services;
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
services.init();
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
static class DummyXLogStreamingService extends XLogStreamingService {
XLogFilter filter;
Date startTime;
Date endTime;
@Override
public void streamLog(XLogStreamer logStreamer, Date startTime, Date endTime, Writer writer)
throws IOException {
filter = logStreamer.getXLogFilter();
this.startTime = startTime;
this.endTime = endTime;
}
}
private CoordinatorEngine createCoordinatorEngine() {
return new CoordinatorEngine(getTestUser());
}
public void testCoordLogStreaming() throws Exception {
services.setService(DummyXLogStreamingService.class);
new DagXLogInfoService().init(services);
CoordinatorEngine ce = createCoordinatorEngine();
final String jobId = createJobs(6);
CoordinatorJobBean cjb = ce.getCoordJob(jobId);
Date createdDate = cjb.getCreatedTime();
Date endDate = cjb.getEndTime();
assertTrue(endDate.after(createdDate));
List<CoordinatorAction> list = cjb.getActions();
Collections.sort(list, new Comparator<CoordinatorAction>() {
public int compare(CoordinatorAction a, CoordinatorAction b) {
return a.getId().compareTo(b.getId());
}
});
// Test 1.to test if fields are injected
ce.streamLog(jobId, new StringWriter(), new HashMap<String, String[]>());
DummyXLogStreamingService service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
XLogFilter filter = service.filter;
assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId);
assertEquals(cjb.getCreatedTime(), service.startTime);
assertEquals(cjb.getLastModifiedTime(), service.endTime);
// Test2
// * Test method org.apache.oozie.CoordinatorEngine.streamLog(String,
// String,
// * String, Writer) with null 2nd and 3rd arguments.
// */
ce.streamLog(jobId, null, null, new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
filter = service.filter;
assertEquals(filter.getFilterParams().get(DagXLogInfoService.JOB), jobId);
// Test 3
// * Test method org.apache.oozie.CoordinatorEngine.streamLog(String,
// String,
// * String, Writer) with RestConstants.JOB_LOG_ACTION and non-null 2nd
// * argument.
ce.streamLog(jobId, "1, 3-4, 6", RestConstants.JOB_LOG_ACTION, new StringWriter(),
new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
filter = service.filter;
assertEquals(jobId, filter.getFilterParams().get(DagXLogInfoService.JOB));
assertEquals("(" + jobId + "@1|" + jobId + "@3|" + jobId + "@4|" + jobId + "@6)",
filter.getFilterParams().get(DagXLogInfoService.ACTION));
// Test 4. testing with date range
long middle = (createdDate.getTime() + endDate.getTime()) / 2;
Date middleDate = new Date(middle);
ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(middleDate)
+ "," + DateUtils.formatDateOozieTZ(middleDate) + "::" + DateUtils.formatDateOozieTZ(endDate),
RestConstants.JOB_LOG_DATE, new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
filter = service.filter;
assertEquals(jobId, filter.getFilterParams().get(DagXLogInfoService.JOB));
final String action = filter.getFilterParams().get(DagXLogInfoService.ACTION);
assertEquals("(" + jobId + "@1|" + jobId + "@2|" + jobId + "@3|" + jobId + "@4|" + jobId + "@5|" + jobId
+ "@6)", action);
// Test 5 testing with action list range
ce.streamLog(jobId, "2-4", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(1).getCreatedTime(), service.startTime);
assertEquals(list.get(3).getLastModifiedTime(), service.endTime);
// Test 6, testing with 1 action list
ce.streamLog(jobId, "5", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(4).getCreatedTime(), service.startTime);
assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
// Test 7, testing with 1 action list + range
ce.streamLog(jobId, "1,2-4,5", RestConstants.JOB_LOG_ACTION, new StringWriter(),
new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(0).getCreatedTime(), service.startTime);
assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
// Test 8, testing with out order range
ce.streamLog(jobId, "5,3-4,1", RestConstants.JOB_LOG_ACTION, new StringWriter(),
new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(0).getCreatedTime(), service.startTime);
assertEquals(list.get(4).getLastModifiedTime(), service.endTime);
// Test 9, testing with date range
ce.streamLog(
jobId,
DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + "::"
+ DateUtils.formatDateOozieTZ(list.get(4).getLastModifiedTime()) + ",",
RestConstants.JOB_LOG_DATE, new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(1).getCreatedTime().toString(), service.startTime.toString());
assertEquals(list.get(4).getLastModifiedTime().toString(), service.endTime.toString());
// Test 10, testing with multiple date range
ce.streamLog(
jobId,
DateUtils.formatDateOozieTZ(list.get(1).getCreatedTime()) + "::"
+ DateUtils.formatDateOozieTZ(list.get(2).getLastModifiedTime()) + ","
+ DateUtils.formatDateOozieTZ(list.get(3).getCreatedTime()) + "::"
+ DateUtils.formatDateOozieTZ(list.get(5).getLastModifiedTime()), RestConstants.JOB_LOG_DATE,
new StringWriter(), new HashMap<String, String[]>());
service = (DummyXLogStreamingService) services.get(XLogStreamingService.class);
assertEquals(list.get(1).getCreatedTime().toString(), service.startTime.toString());
assertEquals(list.get(5).getLastModifiedTime().toString(), service.endTime.toString());
// Test 11, testing -scope option with Max Count
Services.get().getConf().setInt(CoordinatorEngine.COORD_ACTIONS_LOG_MAX_COUNT, 1);
ce = createCoordinatorEngine();
try {
ce.streamLog(jobId, "1-3", RestConstants.JOB_LOG_ACTION, new StringWriter(), new HashMap<String, String[]>());
} catch (XException e){
assertEquals(e.getErrorCode(), ErrorCode.E0302);
assertTrue(e.getMessage().indexOf("Retrieving log of too many coordinator actions") != -1);
}
// Test 12, testing -date option with Max Count
try {
ce.streamLog(jobId, DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(endDate),
RestConstants.JOB_LOG_DATE, new StringWriter(),new HashMap<String, String[]>());
} catch (XException e) {
assertEquals(e.getErrorCode(), ErrorCode.E0302);
assertTrue(e.getMessage().indexOf("Retrieving log of too many coordinator actions") != -1);
}
}
private String createJobs(int numActions) throws Exception {
long time = System.currentTimeMillis();
CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, true);
job.setCreatedTime(new Date(time));
time += 1000 * 60;
for (int i = 1; i <= numActions; i++) {
CoordinatorActionBean action = createCoordAction(job.getId(), i, CoordinatorAction.Status.SUCCEEDED,
"coord-action-get.xml", 0, new Date(time));
action.setCreatedTime(new Date(time));
time += 1000 * 60;
action.setLastModifiedTime(new Date(time));
time += 1000 * 60;
addRecordToCoordActionTable(action, null);
}
job.setEndTime(new Date(time));
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job);
return job.getId();
}
}