TEZ-4048. Make proto history logger queue size configurable

Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8ce8f7c..7b00cf6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1540,6 +1540,16 @@
   public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
 
   /**
+   * Int value. Maximum queue size for proto history event logger.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE =
+      TEZ_PREFIX + "history.logging.queue.size";
+  public static final int TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT = 100000;
+
+
+  /**
    * Boolean value. Set this to true, if the underlying file system does not support flush (Ex: s3).
    * The dag submitted, initialized and started events are written into a file and closed. The rest
    * of the events are written into another file.
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index d2e0b4d..008b05d 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -52,8 +52,7 @@
       new HistoryEventProtoConverter();
   private boolean loggingDisabled = false;
 
-  private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
-      new LinkedBlockingQueue<>(10000);
+  private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
   private Thread eventHandlingThread;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
 
@@ -81,7 +80,11 @@
         TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
     splitDagStartEvents = conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
         TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
-    LOG.info("Inited ProtoHistoryLoggingService");
+    final int queueSize = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE,
+        TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT);
+    eventQueue = new LinkedBlockingQueue<>(queueSize);
+    LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {} splitDagStartEvents: {} queueSize: {}",
+        loggingDisabled, splitDagStartEvents, queueSize);
   }
 
   @Override