OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
diff --git a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
index eac3b38..dd528bb 100644
--- a/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
+++ b/core/src/main/java/org/apache/oozie/service/ZKXLogStreamingService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -139,7 +140,7 @@
         catch (Exception ex) {
             throw new IOException("Issue communicating with ZooKeeper: " + ex.getMessage(), ex);
         }
-        List<TimestampedMessageParser> parsers = new ArrayList<TimestampedMessageParser>(oozies.size());
+        List<TimestampedMessageParser> parsers = new ArrayList<>(oozies.size());
         try {
             // Create a BufferedReader for getting the logs of each server and put them in a TimestampedMessageParser
             for (ServiceInstance<Map> oozie : oozies) {
@@ -156,8 +157,8 @@
                     String otherUrl = oozieMeta.get(ZKUtils.ZKMetadataKeys.OOZIE_URL);
                     String jobId = logStreamer.getXLogFilter().getFilterParams().get(DagXLogInfoService.JOB);
                     try {
-                     // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
-                     // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
+                        // It's important that we specify ALL_SERVERS_PARAM=false in the GET request to prevent the other Oozie
+                        // Server from trying aggregate logs from the other Oozie servers (and creating an infinite recursion)
                         final String url = otherUrl + "/v" + OozieClient.WS_PROTOCOL_VERSION + "/" + RestConstants.JOB
                                 + "/" + jobId + "?" + RestConstants.JOB_SHOW_PARAM + "=" + logStreamer.getLogType()
                                 + "&" + RestConstants.ALL_SERVER_REQUEST + "=false"
@@ -198,7 +199,8 @@
             }
             // Add a message about any servers we couldn't contact
             if (!badOozies.isEmpty()) {
-                writer.write("Unable to contact the following Oozie Servers for logs (log information may be incomplete):\n");
+                writer.write("Unable to contact the following Oozie Servers for logs (log information may be " +
+                        "incomplete):\n");
                 for (String badOozie : badOozies) {
                     writer.write("     ");
                     writer.write(badOozie);
@@ -208,26 +210,33 @@
                 writer.flush();
             }
 
-            // If it's just the one server (this server), then we don't need to do any more processing and can just copy it directly
+            // If it's just the one server (this server), then we don't need to do any more processing and can just
+            // copy it directly
             if (parsers.size() == 1) {
                 TimestampedMessageParser parser = parsers.get(0);
                 parser.processRemaining(writer, logStreamer);
             }
             else {
-                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.  Within each
-                // server, the logs should already be in the correct order, so we can take advantage of that.  We'll use the
-                // BufferedReaders to read the messages from the logs of each server and put them in order without having to bring
-                // every message into memory at the same time.
-                TreeMap<String, TimestampedMessageParser> timestampMap = new TreeMap<String, TimestampedMessageParser>();
-                // populate timestampMap with initial values
+                // Now that we have a Reader for each server to get the logs from that server, we have to collate them.
+                // Within each server, the logs should already be in the correct order, so we can take advantage of
+                // that.  We'll use the BufferedReaders to read the messages from the logs of each server and put them
+                // in order without having to bring every message into memory at the same time.
+
+                // The created TreeMap is capable of handling duplicate keys (timestamps) by maintaining a
+                // collection of values per key. This is important as the timestamps can be exactly the same in
+                // multiple Oozie servers which would end up with missing parsed log messages in case duplicated
+                // keys are not allowed.
+                Map<String, List<TimestampedMessageParser>> timestampTreeMap = new TreeMap<>();
+
+                // populate timestampMultimap with initial values
                 for (TimestampedMessageParser parser : parsers) {
                     if (parser.increment()) {
-                        timestampMap.put(parser.getLastTimestamp(), parser);
+                        putParser(timestampTreeMap, parser.getLastTimestamp(), parser);
                     }
                 }
-                while (timestampMap.size() > 1) {
+                while (getParserAmount(timestampTreeMap) > 1) {
                     // The first entry will be the earliest based on the timestamp (also removes it) from the map
-                    TimestampedMessageParser earliestParser = timestampMap.pollFirstEntry().getValue();
+                    TimestampedMessageParser earliestParser = pollFirstParser(timestampTreeMap);
                     // Write the message from that parser at that timestamp
                     writer.write(StringEscapeUtils.escapeHtml4(earliestParser.getLastMessage()));
                     if (logStreamer.shouldFlushOutput(earliestParser.getLastMessage().length())) {
@@ -236,12 +245,13 @@
                     // Increment that parser to read the next message
                     if (earliestParser.increment()) {
                         // If it still has messages left, put it back in the map with the new last timestamp for it
-                        timestampMap.put(earliestParser.getLastTimestamp(), earliestParser);
+                        putParser(timestampTreeMap, earliestParser.getLastTimestamp(), earliestParser);
                     }
                 }
-                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly to be faster
-                if (timestampMap.size() == 1) {
-                    TimestampedMessageParser parser = timestampMap.values().iterator().next();
+                // If there's only one parser left in the map, then we can simply copy the rest of its lines directly
+                // to be faster
+                if (getParserAmount(timestampTreeMap) == 1) {
+                    TimestampedMessageParser parser = timestampTreeMap.values().iterator().next().get(0);
                     // don't forget the last message read by the parser
                     writer.write(StringEscapeUtils.escapeHtml4(parser.getLastMessage()));
                     parser.processRemaining(writer, logStreamer);
@@ -254,4 +264,50 @@
             }
         }
     }
+
+    /**
+     * Retrieves and removes the first parser entry value from the provided TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap from where the parser value to be polled
+     * @return the first parser element from the provided TreeMap
+     */
+    private TimestampedMessageParser pollFirstParser(Map<String, List<TimestampedMessageParser>> timestampTreeMap) {
+        Iterator<Map.Entry<String, List<TimestampedMessageParser>>> entry = timestampTreeMap.entrySet().iterator();
+        List<TimestampedMessageParser> entryList = entry.next().getValue();
+
+        TimestampedMessageParser parser = entryList.get(0);
+
+        if (entryList.size() == 1) {
+            entry.remove();
+        } else {
+            entryList.remove(parser);
+        }
+        return parser;
+    }
+
+    /**
+     * Adds the provided TimestampedMessageParser object identified by a timestamp string to the given TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap from where the parser value to be polled
+     * @param timestamp timestamp string to be used as an identifier in the TreeMap
+     * @param parser the TimestampedMessageParser object to be added to the TreeMap
+     */
+    private void putParser(Map<String, List<TimestampedMessageParser>> timestampTreeMap, String timestamp,
+                                  TimestampedMessageParser parser) {
+        timestampTreeMap.computeIfAbsent(timestamp, (unused) -> new ArrayList<>()).add(parser);
+    }
+
+    /**
+     * Retrieves the number of TimestampedMessageParser elements in the given TreeMap.
+     *
+     * @param timestampTreeMap the TreeMap to be checked
+     * @return number of TimestampedMessageParser in the given TreeMap
+     */
+    private int getParserAmount(Map<String, List<TimestampedMessageParser>> timestampTreeMap) {
+        int parserAmount = 0;
+        for (Map.Entry<String, List<TimestampedMessageParser>> entry : timestampTreeMap.entrySet()) {
+            parserAmount += entry.getValue().size();
+        }
+        return parserAmount;
+    }
 }
diff --git a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
index 9119e6c..4b9cf1b 100644
--- a/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestZKXLogStreamingService.java
@@ -272,6 +272,10 @@
                                 Services.get().get(XLogService.class).getOozieLogName());
         logFile.getParentFile().mkdirs();
         Writer logWriter = new OutputStreamWriter(new FileOutputStream(logFile), StandardCharsets.UTF_8);
+
+        // Notice below that the timestamps (2013-06-10 10:25:44,008) of the first log messages of the "Oozie servers"
+        // are the same. This is intentional and is supposed to test a corner case in the mechanism of the log streaming
+
         // local logs
         logWriter.append("2013-06-10 10:25:44,008 WARN HiveActionExecutor:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] "
                 + "APP[hive-wf] JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@hive-node] "
@@ -285,7 +289,7 @@
         logWriter.close();
         // logs to be returned by another "Oozie server"
         DummyLogStreamingServlet.logs =
-                "2013-06-10 10:25:43,575 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
+                "2013-06-10 10:25:44,008 WARN ActionStartXCommand:542 SERVER[foo] USER[rkanter] GROUP[-] TOKEN[] APP[hive-wf] "
                 + "JOB[0000003-130610102426873-oozie-rkan-W] ACTION[0000003-130610102426873-oozie-rkan-W@:start:] "
                 + "[***0000003-130610102426873-oozie-rkan-W@:start:***]Action status=DONE _L1_"
                 + "\n"
diff --git a/release-log.txt b/release-log.txt
index 909db42..2122ccf 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.3.0 release (trunk - unreleased)
 
+OOZIE-3666 Oozie log streaming bug when log timestamps are the same on multiple Oozie servers (jmakai via dionusos)
 OOZIE-3661 Oozie cannot handle environment variables with key=value content (dionusos via asalamon74)
 OOZIE-3659 oozieUrl ambiguous port number in TestOozieCLI.java (AlexaD via dionusos)
 OOZIE-3658 Fix TestJMSJobEventListener#testConnectionDrop flakiness again (dionusos via asalamon74)