TEZ-4048. Make proto history logger queue size configurable
Signed-off-by: Jonathan Eagles <jeagles@apache.org>
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 8ce8f7c..7b00cf6 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -1540,6 +1540,16 @@
public static final long TEZ_HISTORY_LOGGING_PROTO_SYNC_WINDOWN_SECS_DEFAULT = 60L;
/**
+ * Int value. Maximum queue size for proto history event logger.
+ */
+ @ConfigurationScope(Scope.AM)
+ @ConfigurationProperty(type="integer")
+ public static final String TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE =
+ TEZ_PREFIX + "history.logging.queue.size";
+ public static final int TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT = 100000;
+
+
+ /**
* Boolean value. Set this to true, if the underlying file system does not support flush (Ex: s3).
* The dag submitted, initialized and started events are written into a file and closed. The rest
* of the events are written into another file.
diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
index d2e0b4d..008b05d 100644
--- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
+++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/ProtoHistoryLoggingService.java
@@ -52,8 +52,7 @@
new HistoryEventProtoConverter();
private boolean loggingDisabled = false;
- private final LinkedBlockingQueue<DAGHistoryEvent> eventQueue =
- new LinkedBlockingQueue<>(10000);
+ private LinkedBlockingQueue<DAGHistoryEvent> eventQueue;
private Thread eventHandlingThread;
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -81,7 +80,11 @@
TezConfiguration.TEZ_AM_HISTORY_LOGGING_ENABLED_DEFAULT);
splitDagStartEvents = conf.getBoolean(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START,
TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_SPLIT_DAG_START_DEFAULT);
- LOG.info("Inited ProtoHistoryLoggingService");
+ final int queueSize = conf.getInt(TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE,
+ TezConfiguration.TEZ_HISTORY_LOGGING_PROTO_QUEUE_SIZE_DEFAULT);
+ eventQueue = new LinkedBlockingQueue<>(queueSize);
+ LOG.info("Inited ProtoHistoryLoggingService. loggingDisabled: {} splitDagStartEvents: {} queueSize: {}",
+ loggingDisabled, splitDagStartEvents, queueSize);
}
@Override