OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index eac3b38..dd528bb 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,6 +24,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -139,7 +140,7 @@
catch (Exception ex) {
throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
}
- List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
+ List<TimestampedMessageParser> parsers = new ArrayList<>(oozies.size());
try {
// Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
for (ServiceInstance<Map> oozie : oozies) {
@@ -156,8 +157,8 @@
String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
try {
- // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
- // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
+ // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
+ // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
+ "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
+ "&" + RestConstants.ALL_SERVER_REQUEST + "=false"
@@ -198,7 +199,8 @@
}
// Add a message about any servers we couldn't contact
if (!badOozies.isEmpty()) {
- writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
+ writer.write("Unable to contact the following Oozie Servers for logs (log information may be " +
+ "incomplete):\n");
for (String badOozie : badOozies) {
writer.write(" ");
writer.write(badOozie);
@@ -208,26 +210,33 @@
writer.flush();
}
- // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
+ // If it's just the one server (this server), then we don't need to do any more processing and can just
+ // copy it directly
if (parsers.size() == 1) {
TimestampedMessageParser parser = parsers.get(0);
parser.processRemaining(writer, logStreamer);
}
else {
- // Now that we have a Reader for each server to get the logs from that server, we have to collate them. Within each
- // server, the logs should already be in the correct order, so we can take advantage of that. We'll use the
- // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
- // every message into memory at the same time.
- TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
- // populate timestampMap with initial values
+ // Now that we have a Reader for each server to get the logs from that server, we have to collate them.
+ // Within each server, the logs should already be in the correct order, so we can take advantage of
+ // that. We'll use the BufferedReaders to read the messages from the logs of each server and put them
+ // in order without having to bring every message into memory at the same time.
+
+ // The created TreeMap is capable of handling duplicate keys (timestamps) by maintaining a
+ // collection of values per key. This is important as the timestamps can be exactly the same in
+ // multiple Oozie servers which would end up with missing parsed log messages in case duplicated
+ // keys are not allowed.
+ Map<String, List<TimestampedMessageParser>> timestampTreeMap = new TreeMap<>();
+
+ // populate timestampMultimap with initial values
for (TimestampedMessageParser parser : parsers) {
if (parser.increment()) {
- timestampMap.put(parser.getLastTimestamp(), parser);
+ putParser(timestampTreeMap, parser.getLastTimestamp(), parser);
}
}
- while (timestampMap.size() > 1) {
+ while (getParserAmount(timestampTreeMap) > 1) {
// The first entry will be the earliest based on the timestamp (also removes it) from the map
- TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
+ TimestampedMessageParser earliestParser = pollFirstParser(timestampTreeMap);
// Write the message from that parser at that timestamp
writer.write(StringEscapeUtils.escapeHtml4(earliestParser.getLastMessage()));
if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
@@ -236,12 +245,13 @@
// Increment that parser to read the next message
if (earliestParser.increment()) {
// If it still has messages left, put it back in the map with the new last timestamp for it
- timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
+ putParser(timestampTreeMap, earliestParser.getLastTimestamp(), earliestParser);
}
}
- // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
- if (timestampMap.size() == 1) {
- TimestampedMessageParser parser = timestampMap.values().iterator().next();
+ // If there's only one parser left in the map, then we can simply copy the rest of its lines directly
+ // to be faster
+ if (getParserAmount(timestampTreeMap) == 1) {
+ TimestampedMessageParser parser = timestampTreeMap.values().iterator().next().get(0);
// don't forget the last message read by the parser
writer.write(StringEscapeUtils.escapeHtml4(parser.getLastMessage()));
parser.processRemaining(writer, logStreamer);
@@ -254,4 +264,50 @@
}
}
}
+
+ /**
+ * Retrieves and removes the first parser entry value from the provided TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap from where the parser value to be polled
+ * @return the first parser element from the provided TreeMap
+ */
+ private TimestampedMessageParser pollFirstParser(Map<String, List<TimestampedMessageParser>> timestampTreeMap) {
+ Iterator<Map.Entry<String, List<TimestampedMessageParser>>> entry = timestampTreeMap.entrySet().iterator();
+ List<TimestampedMessageParser> entryList = entry.next().getValue();
+
+ TimestampedMessageParser parser = entryList.get(0);
+
+ if (entryList.size() == 1) {
+ entry.remove();
+ } else {
+ entryList.remove(parser);
+ }
+ return parser;
+ }
+
+ /**
+ * Adds the provided TimestampedMessageParser object identified by a timestamp string to the given TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap from where the parser value to be polled
+ * @param timestamp timestamp string to be used as an identifier in the TreeMap
+ * @param parser the TimestampedMessageParser object to be added to the TreeMap
+ */
+ private void putParser(Map<String, List<TimestampedMessageParser>> timestampTreeMap, String timestamp,
+ TimestampedMessageParser parser) {
+ timestampTreeMap.computeIfAbsent(timestamp, (unused) -> new ArrayList<>()).add(parser);
+ }
+
+ /**
+ * Retrieves the number of TimestampedMessageParser elements in the given TreeMap.
+ *
+ * @param timestampTreeMap the TreeMap to be checked
+ * @return number of TimestampedMessageParser in the given TreeMap
+ */
+ private int getParserAmount(Map<String, List<TimestampedMessageParser>> timestampTreeMap) {
+ int parserAmount = 0;
+ for (Map.Entry<String, List<TimestampedMessageParser>> entry : timestampTreeMap.entrySet()) {
+ parserAmount += entry.getValue().size();
+ }
+ return parserAmount;
+ }
}
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 9119e6c..4b9cf1b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -272,6 +272,10 @@
Services.get().get(XLogService.class).getOozieLogName());
logFile.getParentFile().mkdirs();
Writer logWriter = new OutputStreamWriter(new FileOutputStream(logFile), StandardCharsets.UTF_8);
+
+ // Notice below that the timestamps (2013-06-10 10:25:44,008) of the first log messages of the "Oozie servers"
+ // are the same. This is intentional and is supposed to test a corner case in the mechanism of the log streaming
+
// local logs
logWriter.append("2013-06-10 10:25:44,008 WARN HiveActionExecutor:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
+ "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
@@ -285,7 +289,7 @@
logWriter.close();
// logs to be returned by another "Oozie server"
DummyLogStreamingServlet.logs =
- "2013-06-10 10:25:43,575 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "2013-06-10 10:25:44,008 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+ "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
+ "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action status=DONE _L1_"
+ "\n"
diff --git a/release-log.txt b/release-log.txt
index 909db42..2122ccf 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)
+OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
OOZIE-3661 Oozie cannot handle environment variables with key=value content (dionusos via asalamon74)
OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via dionusos)
OOZIE-3658 Fix TestJMSJobEventListener#testConnectionDrop flakiness again (dionusos via asalamon74)