Added Heron Instance related vars in cpp. This will be used by CPP Instance (#2442)
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index 9481ccc..a696d32 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -280,5 +280,61 @@
return config_[HeronInternalsConfigVars::HERON_STREAMMGR_NETWORK_BACKPRESSURE_LOWWATERMARK_MB]
.as<int>();
}
+
+int HeronInternalsConfigReader::GetHeronInstanceReconnectStreammgrIntervalSec() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceReconnectStreammgrTimes() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_RECONNECT_STREAMMGR_TIMES]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceInternalBoltReadQueueCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_BOLT_READ_QUEUE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceInternalBoltWriteQueueCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_BOLT_WRITE_QUEUE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceInternalSpoutReadQueueCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_READ_QUEUE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceInternalSpoutWriteQueueCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QUEUE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceEmitBatchTimeMs() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_TIME_MS]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceSetDataTupleCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceSetDataTupleSizeBytes() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceSetControlTupleCapacity() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_CONTROL_TUPLE_CAPACITY]
+ .as<int>();
+}
+
+int HeronInternalsConfigReader::GetHeronInstanceAcknowledgementNbuckets() {
+ return config_[HeronInternalsConfigVars::HERON_INSTANCE_ACKNOWLEDGEMENT_NBUCKETS]
+ .as<int>();
+}
+
} // namespace config
} // namespace heron
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 6728780..d1585cd 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -191,6 +191,43 @@
// Low water mark on the num in MB that can be left outstanding on a connection
sp_int32 GetHeronStreammgrNetworkBackpressureLowwatermarkMb();
+ /**
+ * Instance Config Getters
+ **/
+
+ // Interval in seconds to reconnect to the stream manager
+ int GetHeronInstanceReconnectStreammgrIntervalSec();
+
+ // Number of attempts to connect to stream manager before giving up
+ int GetHeronInstanceReconnectStreammgrTimes();
+
+ // The queue capacity (num of items) in bolt for buffer packets to read from stream manager
+ int GetHeronInstanceInternalBoltReadQueueCapacity();
+
+ // The queue capacity (num of items) in bolt for buffer packets to write to stream manager
+ int GetHeronInstanceInternalBoltWriteQueueCapacity();
+
+ // The queue capacity (num of items) in spout for buffer packets to read from stream manager
+ int GetHeronInstanceInternalSpoutReadQueueCapacity();
+
+ // The queue capacity (num of items) in spout for buffer packets to write to stream manager
+ int GetHeronInstanceInternalSpoutWriteQueueCapacity();
+
+ // The maximum time in ms for an spout instance to emit tuples per attempt
+ int GetHeronInstanceEmitBatchTimeMs();
+
+ // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
+ int GetHeronInstanceSetDataTupleCapacity();
+
+ // The maximum size in bytes of data tuple to batch in a HeronDataTupleSet protobuf
+ int GetHeronInstanceSetDataTupleSizeBytes();
+
+ // The maximum # of control tuple to batch in a HeronControlTupleSet protobuf
+ int GetHeronInstanceSetControlTupleCapacity();
+
+ // For efficient acknowledgement
+ int GetHeronInstanceAcknowledgementNbuckets();
+
protected:
HeronInternalsConfigReader(EventLoop* eventLoop,
const sp_string& _defaults_file,
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
index addcadf..84dc345 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
@@ -112,5 +112,29 @@
"heron.streammgr.network.backpressure.lowwatermark.mb";
const sp_string HeronInternalsConfigVars::HERON_STREAMMGR_STATEFUL_BUFFER_SIZE_MB =
"heron.streammgr.stateful.buffer.size.mb";
+
+// heron.instance.* configs are for the instance
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC =
+ "heron.instance.reconnect.streammgr.interval.sec";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_RECONNECT_STREAMMGR_TIMES =
+ "heron.instance.reconnect.streammgr.times";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_BOLT_READ_QUEUE_CAPACITY =
+ "heron.instance.internal.bolt.read.queue.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_BOLT_WRITE_QUEUE_CAPACITY =
+ "heron.instance.internal.bolt.write.queue.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_READ_QUEUE_CAPACITY =
+ "heron.instance.internal.spout.read.queue.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QUEUE_CAPACITY =
+ "heron.instance.internal.spout.write.queue.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_TIME_MS =
+ "heron.instance.emit.batch.time.ms";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY =
+ "heron.instance.set.data.tuple.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES =
+ "heron.instance.set.data.tuple.size.bytes";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_CONTROL_TUPLE_CAPACITY =
+ "heron.instance.set.control.tuple.capacity";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_ACKNOWLEDGEMENT_NBUCKETS =
+ "heron.instance.acknowledgement.nbuckets";
} // namespace config
} // namespace heron
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h
index f711a9b..36b3498 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.h
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.h
@@ -25,6 +25,9 @@
#ifndef HERON_INTERNALS_COFNIG_VARS_H_
#define HERON_INTERNALS_COFNIG_VARS_H_
+
+#include <string>
+
#include "basics/basics.h"
namespace heron {
@@ -177,6 +180,43 @@
// The size based threshold in MB for buffering data tuples waiting for
// checkpoint markers to arrive before giving up
static const sp_string HERON_STREAMMGR_STATEFUL_BUFFER_SIZE_MB;
+
+ /**
+ * HERON_INSTANCE_* configs are for the instance
+ **/
+
+ // Interval in seconds to reconnect to the stream manager
+ static const sp_string HERON_INSTANCE_RECONNECT_STREAMMGR_INTERVAL_SEC;
+
+ // Number of attempts to connect to stream manager before giving up
+ static const sp_string HERON_INSTANCE_RECONNECT_STREAMMGR_TIMES;
+
+ // The queue capacity (num of items) in bolt for buffer packets to read from stream manager
+ static const sp_string HERON_INSTANCE_INTERNAL_BOLT_READ_QUEUE_CAPACITY;
+
+ // The queue capacity (num of items) in bolt for buffer packets to write to stream manager
+ static const sp_string HERON_INSTANCE_INTERNAL_BOLT_WRITE_QUEUE_CAPACITY;
+
+ // The queue capacity (num of items) in spout for buffer packets to read from stream manager
+ static const sp_string HERON_INSTANCE_INTERNAL_SPOUT_READ_QUEUE_CAPACITY;
+
+ // The queue capacity (num of items) in spout for buffer packets to write to stream manager
+ static const sp_string HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QUEUE_CAPACITY;
+
+ // The maximum time in ms for an spout instance to emit tuples per attempt
+ static const sp_string HERON_INSTANCE_EMIT_BATCH_TIME_MS;
+
+ // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
+ static const sp_string HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY;
+
+ // The maximum size in bytes of data tuple to batch in a HeronDataTupleSet protobuf
+ static const sp_string HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES;
+
+ // The maximum # of control tuple to batch in a HeronControlTupleSet protobuf
+ static const sp_string HERON_INSTANCE_SET_CONTROL_TUPLE_CAPACITY;
+
+ // For efficient acknowledgement
+ static const sp_string HERON_INSTANCE_ACKNOWLEDGEMENT_NBUCKETS;
};
} // namespace config
} // namespace heron