blob: 9119e6c17e93ef1e0c2228db5bca167f1881620f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.logging.LogFactory;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.test.EmbeddedServletContainer;
import org.apache.oozie.test.ZKXTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLogErrorStreamer;
import org.apache.oozie.util.XLogFilter;
import org.apache.oozie.util.XLogStreamer;
import org.apache.oozie.util.ZKUtils;
public class TestZKXLogStreamingService extends ZKXTestCase {
@Override
protected void setUp() throws Exception {
super.setUp();
}
@Override
protected void tearDown() throws Exception {
super.tearDown();
}
public void testRegisterUnregister() throws Exception {
assertEquals(0, ZKUtils.getUsers().size());
ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
try {
zkxlss.init(Services.get());
assertEquals(1, ZKUtils.getUsers().size());
assertEquals(zkxlss, ZKUtils.getUsers().iterator().next());
zkxlss.destroy();
assertEquals(0, ZKUtils.getUsers().size());
}
finally {
zkxlss.destroy();
}
}
public void testDisableLogOverWS() throws Exception {
Properties props = new Properties();
// Test missing logfile
props.setProperty("log4j.appender.oozie.File", "");
File propsFile = new File(getTestCaseConfDir(), "test-disable-log-over-ws-log4j.properties");
FileOutputStream fos = new FileOutputStream(propsFile);
props.store(fos, "");
setSystemProperty(XLogService.LOG4J_FILE, propsFile.getName());
assertTrue(doStreamDisabledCheck());
// Test non-absolute path for logfile
props.setProperty("log4j.appender.oozie.File", "oozie.log");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test missing appender class
props.setProperty("log4j.appender.oozie.File", "${oozie.log.dir}/oozie.log");
props.setProperty("log4j.appender.oozie", "");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test appender class not DailyRollingFileAppender or RollingFileAppender
props.setProperty("log4j.appender.oozie", "org.blah.blah");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test DailyRollingFileAppender but missing DatePattern
props.setProperty("log4j.appender.oozie", "org.apache.log4j.DailyRollingFileAppender");
props.setProperty("log4j.appender.oozie.DatePattern", "");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test DailyRollingFileAppender but DatePattern that doesn't end with 'HH' or 'dd'
props.setProperty("log4j.appender.oozie.DatePattern", "'.'yyyy-MM");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test DailyRollingFileAppender with everything correct (dd)
props.setProperty("log4j.appender.oozie.DatePattern", "'.'yyyy-MM-dd");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertFalse(doStreamDisabledCheck());
// Test DailyRollingFileAppender with everything correct (HH)
props.setProperty("log4j.appender.oozie.DatePattern", "'.'yyyy-MM-dd-HH");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertFalse(doStreamDisabledCheck());
// Test RollingFileAppender but missing FileNamePattern
props.setProperty("log4j.appender.oozie", "org.apache.log4j.rolling.RollingFileAppender");
props.setProperty("log4j.appender.oozie.RollingPolicy.FileNamePattern", "");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test RollingFileAppender but FileNamePattern with incorrect ending
props.setProperty("log4j.appender.oozie.RollingPolicy.FileNamePattern", "${oozie.log.dir}/oozie.log-blah");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test RollingFileAppender but FileNamePattern with incorrect beginning
props.setProperty("log4j.appender.oozie.RollingPolicy.FileNamePattern", "${oozie.log.dir}/blah.log-%d{yyyy-MM-dd-HH}");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertTrue(doStreamDisabledCheck());
// Test RollingFileAppender with everything correct
props.setProperty("log4j.appender.oozie.RollingPolicy.FileNamePattern", "${oozie.log.dir}/oozie.log-%d{yyyy-MM-dd-HH}");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertFalse(doStreamDisabledCheck());
// Test RollingFileAppender with everything correct (gz)
props.setProperty("log4j.appender.oozie.RollingPolicy.FileNamePattern", "${oozie.log.dir}/oozie.log-%d{yyyy-MM-dd-HH}.gz");
fos = new FileOutputStream(propsFile);
props.store(fos, "");
assertFalse(doStreamDisabledCheck());
}
public void testNoDashInConversionPattern() throws Exception{
XLogFilter.reset();
XLogFilter.defineParameter("USER");
XLogFilter.defineParameter("GROUP");
XLogFilter.defineParameter("TOKEN");
XLogFilter.defineParameter("APP");
XLogFilter.defineParameter("JOB");
XLogFilter.defineParameter("ACTION");
XLogFilter xf = new XLogFilter();
xf.setParameter("USER", "oozie");
xf.setLogLevel("DEBUG|INFO");
// Previously, a dash ("-") was always required somewhere in a line in order for that line to pass the filter; this test
// checks that this condition is no longer required for log streaming to work
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
Properties log4jProps = new Properties();
log4jProps.load(is);
// prevent conflicts with other tests by changing the log file location
log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
log4jProps.store(new FileOutputStream(log4jFile), "");
setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
assertFalse(doStreamDisabledCheck());
LogFactory.getLog("a").info("2009-06-24 02:43:14,505 INFO _L1_:317 - SERVER[foo] USER[oozie] GROUP[oozie] TOKEN[-] APP[-] "
+ "JOB[-] ACTION[-] Released Lock");
LogFactory.getLog("a").info("2009-06-24 02:43:14,505 INFO _L2_:317 - SERVER[foo] USER[blah] GROUP[oozie] TOKEN[-] APP[-] "
+ "JOB[-] ACTION[-] Released Lock");
LogFactory.getLog("a").info("2009-06-24 02:43:14,505 INFO _L3_:317 SERVER[foo] USER[oozie] GROUP[oozie] TOKEN[-] APP[-] "
+ "JOB[-] ACTION[-] Released Lock");
LogFactory.getLog("a").info("2009-06-24 02:43:14,505 INFO _L4_:317 SERVER[foo] USER[blah] GROUP[oozie] TOKEN[-] APP[-] "
+ "JOB[-] ACTION[-] Released Lock");
String out = doStreamLog(xf);
String outArr[] = out.split("\n");
// Lines 2 and 4 are filtered out because they have the wrong user
assertEquals(2, outArr.length);
assertTrue(outArr[0].contains("_L1_"));
assertFalse(out.contains("_L2_"));
assertTrue(outArr[1].contains("_L3_"));
assertFalse(out.contains("_L4_"));
}
private boolean doStreamDisabledCheck() throws Exception {
Services.get().get(XLogService.class).init(Services.get());
return doStreamLog(new XLogFilter()).equals("Log streaming disabled!!");
}
protected String doStreamLog(XLogFilter xf) throws Exception {
return doStreamLog(xf, new HashMap<String, String[]>());
}
protected String doStreamLog(XLogFilter xf, Date startTime, Date endTime) throws Exception {
return doStreamLog(xf, new HashMap<String, String[]>(), false, startTime, endTime);
}
protected String doStreamErrorLog(XLogFilter xf) throws Exception {
return doStreamLog(xf, new HashMap<String, String[]>(), true);
}
private String doStreamErrorLog(XLogFilter xf, Date startDate, Date endDate) throws Exception {
return doStreamLog(xf, new HashMap<String, String[]>(), true, startDate, startDate);
}
protected String doStreamLog(XLogFilter xf, Map<String, String[]> param) throws Exception {
return doStreamLog(xf, param, false);
}
protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog) throws Exception {
return doStreamLog(xf, param, isErrorLog, null, null);
}
protected String doStreamLog(XLogFilter xf, Map<String, String[]> param, boolean isErrorLog, Date startTime,
Date endTime) throws Exception {
StringWriter w = new StringWriter();
ZKXLogStreamingService zkxlss = new ZKXLogStreamingService();
try {
Services services = Services.get();
services.setService(ZKJobsConcurrencyService.class);
zkxlss.init(services);
sleep(1000); // Sleep to allow ZKUtils ServiceCache to update
if (isErrorLog) {
zkxlss.streamLog(new XLogErrorStreamer(xf, param), startTime, endTime, w);
}
else {
zkxlss.streamLog(new XLogStreamer(xf, param), startTime, endTime, w);
}
}
finally {
zkxlss.destroy();
}
String wStr = w.toString();
System.out.println("\ndoStreamLog:\n" + wStr + "\n");
return wStr;
}
public void testStreamingWithMultipleOozieServers() throws Exception {
XLogFilter.reset();
XLogFilter.defineParameter("USER");
XLogFilter.defineParameter("GROUP");
XLogFilter.defineParameter("TOKEN");
XLogFilter.defineParameter("APP");
XLogFilter.defineParameter("JOB");
XLogFilter.defineParameter("ACTION");
XLogFilter xf = new XLogFilter();
xf.setParameter("JOB", "0000003-130610102426873-oozie-rkan-W");
xf.setLogLevel("WARN|INFO");
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
Properties log4jProps = new Properties();
log4jProps.load(is);
// prevent conflicts with other tests by changing the log file location
log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
log4jProps.store(new FileOutputStream(log4jFile), "");
setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
assertFalse(doStreamDisabledCheck());
File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(),
Services.get().get(XLogService.class).getOozieLogName());
logFile.getParentFile().mkdirs();
Writer logWriter = new OutputStreamWriter(new FileOutputStream(logFile), StandardCharsets.UTF_8);
// local logs
logWriter.append("2013-06-10 10:25:44,008 WARN HiveActionExecutor:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
+ "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
+ "credentials is null for the action _L3_").append("\n")
.append("2013-06-10 10:26:10,008 INFO HiveActionExecutor:539 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
+ "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
+ "action completed, external ID [job_201306101021_0005] _L4_").append("\n")
.append("2013-06-10 10:26:10,341 WARN ActionStartXCommand:542 USER[rkanter] GROUP[-] TOKEN[] "
+ "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@end] "
+ "[***0000003-130610102426873-oozie-rkan-W@end***]Action updated in DB! _L6_").append("\n");
logWriter.close();
// logs to be returned by another "Oozie server"
DummyLogStreamingServlet.logs =
"2013-06-10 10:25:43,575 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
+ "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action status=DONE _L1_"
+ "\n"
+ "2013-06-10 10:25:43,575 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
+ "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action updated in DB! _L2_"
+ "\n"
+ "2013-06-10 10:26:10,148 INFO HiveActionExecutor:539 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] action produced"
+ " output _L5_"
+ "\n"
// a multiline message with a stack trace
+ "2013-06-10 10:26:30,202 WARN ActionStartXCommand:542 - SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] Error starting "
+ "action [hive-node]. ErrorType [TRANSIENT], ErrorCode [JA009], Message [JA009: java.io.IOException: Unknown "
+ "protocol to name node: org.apache.hadoop.mapred.JobSubmissionProtocol _L7_\n"
+ " at org.apache.hadoop.hdfs.server.namenode.NameNode.getProtocolVersion(NameNode.java:156) _L8_\n"
+ " at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)_L9_\n"
+ " at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1190) _L10_\n"
+ " at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1426) _L11_\n"
+ "] _L12_\n"
+ "org.apache.oozie.action.ActionExecutorException: JA009: java.io.IOException: Unknown protocol to name node: "
+ "org.apache.hadoop.mapred.JobSubmissionProtocol _L13_\n"
+ " at org.apache.hadoop.hdfs.server.namenode.NameNode.getProtocolVersion(NameNode.java:156) _L14_\n"
+ " at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) _L15_\n"
+ " at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) _L16_\n";
String out = doStreamLog(xf);
String[] outArr = out.split("\n");
assertEquals(3, outArr.length);
assertFalse(out.contains("_L1_"));
assertFalse(out.contains("_L2_"));
assertTrue(outArr[0].contains("_L3_"));
assertTrue(outArr[1].contains("_L4_"));
assertFalse(out.contains("_L5_"));
assertTrue(outArr[2].contains("_L6_"));
// We'll use a DummyZKOozie to create an entry in ZK and then set its url to an (unrelated) servlet that will simply return
// some log messages
DummyZKOozie dummyOozie = null;
EmbeddedServletContainer container = new EmbeddedServletContainer("oozie");
container.addServletEndpoint("/other-oozie-server/*", DummyLogStreamingServlet.class);
try {
container.start();
dummyOozie = new DummyZKOozie("9876", container.getServletURL("/other-oozie-server/*"));
DummyLogStreamingServlet.lastQueryString = null;
out = doStreamLog(xf);
outArr = out.split("\n");
assertEquals(16, outArr.length);
assertTrue(outArr[0].contains("_L1_"));
assertTrue(outArr[1].contains("_L2_"));
assertTrue(outArr[2].contains("_L3_"));
assertTrue(outArr[3].contains("_L4_"));
assertTrue(outArr[4].contains("_L5_"));
assertTrue(outArr[5].contains("_L6_"));
assertTrue(outArr[6].contains("_L7_"));
assertTrue(outArr[7].contains("_L8_"));
assertTrue(outArr[8].contains("_L9_"));
assertTrue(outArr[9].contains("_L10_"));
assertTrue(outArr[10].contains("_L11_"));
assertTrue(outArr[11].contains("_L12_"));
assertTrue(outArr[12].contains("_L13_"));
assertTrue(outArr[13].contains("_L14_"));
assertTrue(outArr[14].contains("_L15_"));
assertTrue(outArr[15].contains("_L16_"));
assertEquals("show=log&allservers=false", DummyLogStreamingServlet.lastQueryString);
// If we stop the container but leave the DummyZKOozie running, it will simulate if that server is down but still has
// info in ZK; we should be able to get the logs from other servers (in this case, this server) and a message about
// which servers it couldn't reach
container.stop();
out = doStreamLog(xf);
outArr = out.split("\n");
assertEquals(6, outArr.length);
assertTrue(outArr[0].startsWith("Unable"));
assertEquals("9876", outArr[1].trim());
assertEquals("", outArr[2]);
assertFalse(out.contains("_L1_"));
assertFalse(out.contains("_L2_"));
assertTrue(outArr[3].contains("_L3_"));
assertTrue(outArr[4].contains("_L4_"));
assertFalse(out.contains("_L5_"));
assertTrue(outArr[5].contains("_L6_"));
}
finally {
if (dummyOozie != null) {
dummyOozie.teardown();
}
container.stop();
}
}
public void testStreamingWithMultipleOozieServers_coordActionList() throws Exception {
XLogFilter.reset();
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
Properties log4jProps = new Properties();
log4jProps.load(is);
// prevent conflicts with other tests by changing the log file location
log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
log4jProps.store(new FileOutputStream(log4jFile), "");
setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
Services.get().get(XLogService.class).init(Services.get());
File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(), Services.get()
.get(XLogService.class).getOozieLogName());
logFile.getParentFile().mkdirs();
Writer logWriter = new OutputStreamWriter(new FileOutputStream(logFile), StandardCharsets.UTF_8);
// local logs
StringBuffer bf = new StringBuffer();
bf.append(
"2014-02-06 00:26:56,126 DEBUG CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
.append("2014-02-06 00:26:56,150 INFO CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "[0000003-140205233038063-oozie-oozi-C@1]::ActionInputCheck:: File::8020/user/purushah/examples/input-data/"
+ "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L1_")
.append("\n")
.append("2014-02-06 00:27:56,126 DEBUG CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+ "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
.append("2014-02-06 00:27:56,150 INFO CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+ "[0000003-140205233038063-oozie-oozi-C@2]::ActionInputCheck:: File::8020/user/purushah/examples/input-data/"
+ "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L2_")
.append("\n");
logWriter.append(bf);
logWriter.close();
XLogFilter.reset();
XLogFilter.defineParameter("USER");
XLogFilter.defineParameter("GROUP");
XLogFilter.defineParameter("TOKEN");
XLogFilter.defineParameter("APP");
XLogFilter.defineParameter("JOB");
XLogFilter.defineParameter("ACTION");
XLogFilter xf = new XLogFilter();
xf.setLogLevel("DEBUG|INFO");
xf.setParameter("USER", ".*");
xf.setParameter("GROUP", ".*");
xf.setParameter("TOKEN", ".*");
xf.setParameter("APP", ".*");
xf.setParameter("JOB", "0000003-140205233038063-oozie-oozi-C");
xf.setParameter(DagXLogInfoService.ACTION, "0000003-140205233038063-oozie-oozi-C@1");
String out = doStreamLog(xf);
String[] outArr = out.split("\n");
assertEquals(2, outArr.length);
assertTrue(out.contains("_L1_"));
assertFalse(out.contains("_L2_"));
// We'll use a DummyZKOozie to create an entry in ZK and then set its
// url to an (unrelated) servlet that will simply return
// some log messages
DummyZKOozie dummyOozie = null;
EmbeddedServletContainer container = new EmbeddedServletContainer("oozie");
container.addServletEndpoint("/other-oozie-server/*", DummyLogStreamingServlet.class);
try {
container.start();
dummyOozie = new DummyZKOozie("9876", container.getServletURL("/other-oozie-server/*"));
DummyLogStreamingServlet.logs = "";
DummyLogStreamingServlet.lastQueryString = null;
Map<String, String[]> param = new HashMap<String, String[]>();
param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_ACTION });
param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { "1" });
out = doStreamLog(xf, param);
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_ACTION ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=1" ));
param.clear();
param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_ACTION });
param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { "1-4,5" });
out = doStreamLog(xf, param);
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_ACTION ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=1-4,5" ));
param.clear();
Date endDate = new Date();
Date createdDate = new Date(endDate.getTime() / 2);
String date = DateUtils.formatDateOozieTZ(createdDate) + "::" + DateUtils.formatDateOozieTZ(endDate);
param.put(RestConstants.JOB_COORD_RANGE_TYPE_PARAM, new String[] { RestConstants.JOB_LOG_DATE });
param.put(RestConstants.JOB_COORD_SCOPE_PARAM, new String[] { date });
out = doStreamLog(xf, param);
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("show=log&allservers=false" ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains("type=" + RestConstants.JOB_LOG_DATE ));
assertTrue(DummyLogStreamingServlet.lastQueryString.contains(RestConstants.JOB_COORD_SCOPE_PARAM + "=" + date ));
container.stop();
}
finally {
if (dummyOozie != null) {
dummyOozie.teardown();
}
container.stop();
}
}
public void testStreamingWithMultipleOozieServers_errorLog() throws Exception {
XLogFilter.reset();
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
Properties log4jProps = new Properties();
log4jProps.load(is);
// prevent conflicts with other tests by changing the log file location
log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
log4jProps.setProperty("log4j.appender.oozieError.File", getTestCaseDir() + "/oozie-error.log");
log4jProps.store(new FileOutputStream(log4jFile), "");
setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
Services.get().get(XLogService.class).init(Services.get());
File logFile = new File(Services.get().get(XLogService.class).getOozieErrorLogPath(), Services.get()
.get(XLogService.class).getOozieErrorLogName());
logFile.getParentFile().mkdirs();
Writer logWriter = new OutputStreamWriter(new FileOutputStream(logFile), StandardCharsets.UTF_8);
// local logs
StringBuffer bf = new StringBuffer();
bf.append(
"2014-02-06 00:26:56,126 WARN CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
.append("2014-02-06 00:26:56,150 WARN CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "[0000003-140205233038063-oozie-oozi-C@1]::ActionInputCheck::File::8020/user/purushah/examples/input-data/"
+ "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L1_")
.append("\n")
.append("2014-02-06 00:27:56,126 WARN CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+ "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
.append("2014-02-06 00:27:56,150 WARN CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@2] "
+ "[0000003-140205233038063-oozie-oozi-C@2]::ActionInputCheck::File::8020/user/purushah/examples/input-data/"
+ "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L2_")
.append("\n");
logWriter.append(bf);
logWriter.close();
XLogFilter.reset();
XLogFilter.defineParameter("USER");
XLogFilter.defineParameter("GROUP");
XLogFilter.defineParameter("TOKEN");
XLogFilter.defineParameter("APP");
XLogFilter.defineParameter("JOB");
XLogFilter.defineParameter("ACTION");
XLogFilter xf = new XLogFilter();
xf.setParameter("USER", ".*");
xf.setParameter("GROUP", ".*");
xf.setParameter("TOKEN", ".*");
xf.setParameter("APP", ".*");
xf.setParameter("JOB", "0000003-140205233038063-oozie-oozi-C");
xf.setParameter(DagXLogInfoService.ACTION, "0000003-140205233038063-oozie-oozi-C@1");
String out = doStreamErrorLog(xf);
String[] outArr = out.split("\n");
assertEquals(2, outArr.length);
assertTrue(out.contains("_L1_"));
assertFalse(out.contains("_L2_"));
// We'll use a DummyZKOozie to create an entry in ZK and then set its
// url to an (unrelated) servlet that will simply return
// some log messages
DummyZKOozie dummyOozie = null;
EmbeddedServletContainer container = new EmbeddedServletContainer("oozie");
container.addServletEndpoint("/other-oozie-server/*", DummyLogStreamingServlet.class);
try {
container.start();
dummyOozie = new DummyZKOozie("9876", container.getServletURL("/other-oozie-server/*"));
StringBuilder newLog = new StringBuilder();
newLog.append(
"2014-02-07 00:26:56,126 WARN CoordActionInputCheckXCommand:545 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "checking for the file ~:8020/user/purushah/examples/input-data/rawLogs/2010/01/01/01/00/_SUCCESS\n")
.append("2014-02-07 00:26:56,150 WARN CoordActionInputCheckXCommand:539 [pool-2-thread-26] - USER[-] GROUP[-] "
+ "TOKEN[-] APP[-] JOB[0000003-140205233038063-oozie-oozi-C] ACTION[0000003-140205233038063-oozie-oozi-C@1] "
+ "[0000003-140205233038063-oozie-oozi-C@1]::ActionInputCheck::File::8020/user/purushah/examples/input-data/"
+ "rawLogs/2010/01/01/01/00/_SUCCESS, Exists? :false" + "Action updated in DB! _L3_")
.append("\n");
DummyLogStreamingServlet.logs = newLog.toString();
out = doStreamErrorLog(xf);
outArr = out.split("\n");
assertEquals(4, outArr.length);
assertTrue(out.contains("_L1_"));
assertTrue(out.contains("_L3_"));
assertFalse(out.contains("_L2_"));
container.stop();
}
finally {
if (dummyOozie != null) {
dummyOozie.teardown();
}
container.stop();
}
}
public void testTuncateLog() throws Exception {
XLogFilter.reset();
File log4jFile = new File(getTestCaseConfDir(), "test-log4j.properties");
ClassLoader cl = Thread.currentThread().getContextClassLoader();
InputStream is = cl.getResourceAsStream("test-no-dash-log4j.properties");
Properties log4jProps = new Properties();
log4jProps.load(is);
// prevent conflicts with other tests by changing the log file location
log4jProps.setProperty("log4j.appender.oozie.File", getTestCaseDir() + "/oozie.log");
log4jProps.store(new FileOutputStream(log4jFile), "");
setSystemProperty(XLogService.LOG4J_FILE, log4jFile.getName());
assertFalse(doStreamDisabledCheck());
File logFile = new File(Services.get().get(XLogService.class).getOozieLogPath(),
Services.get().get(XLogService.class).getOozieLogName());
logFile.getParentFile().mkdirs();
ConfigurationService.set(XLogFilter.MAX_SCAN_DURATION, "1");
Date startDate = new Date();
Date endDate = new Date(startDate.getTime() + 60 * 60 * 1000 * 15);
String log = doStreamLog(new XLogFilter(), startDate, endDate);
assertTrue(log.contains("Truncated logs to max log scan duration"));
String logError = doStreamErrorLog(new XLogFilter(), startDate, endDate);
assertFalse(logError.contains("Truncated logs to max log scan duration"));
}
}