Counting pending tuples (#2951)
* only using tuple num in tuplestream message to count number of data tuples
* adding a field to tuplestreammessage to hold number of tuples in message
* removing redundant code line
diff --git a/heron/proto/stmgr.proto b/heron/proto/stmgr.proto
index c5838a4..b403eff 100644
--- a/heron/proto/stmgr.proto
+++ b/heron/proto/stmgr.proto
@@ -67,4 +67,6 @@
required int32 task_id = 1;
// serialized data
required bytes set = 2;
+ // number of tuples in serialized data tuples only
+ required int32 num_tuples = 4;
}
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 60480b7..5f8d9a2 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -74,7 +74,10 @@
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = "__time_spent_back_pressure_by_compid/";
// Prefix for connection buffer's metrics
const sp_string CONNECTION_BUFFER_BY_INSTANCEID = "__connection_buffer_by_instanceid/";
-// Prefix for connection buffer's length metrics
+// Prefix for connection buffer's length metrics. This is different
+// from METRIC_DATA_TUPLES_TO_INSTANCES as that counts
+// the tuples when they are sent to the instance -- this metric
+// will be used to count the tuples as they are received
const sp_string CONNECTION_BUFFER_LENGTH_BY_INSTANCEID =
"__connection_buffer_length_by_instanceid/";
@@ -396,6 +399,13 @@
}
void InstanceServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
+ sp_string instance_id = task_id_to_name[_message->task_id()];
+ ConnectionBufferLengthMetricMap::const_iterator it =
+ connection_buffer_length_metric_map_.find(instance_id);
+ if ( it != connection_buffer_length_metric_map_.end() )
+ connection_buffer_length_metric_map_
+ [instance_id]->scope("packets")->incr_by(_message->num_tuples());
+
stateful_gateway_->SendToInstance(_message);
}
@@ -413,15 +423,16 @@
void InstanceServer::SendToInstance2(sp_int32 _task_id,
proto::system::HeronTupleSet2* _message) {
- sp_string instance_id = task_id_to_name[_task_id];
+ if (_message->has_data()) {
+ sp_string instance_id = task_id_to_name[_task_id];
- ConnectionBufferLengthMetricMap::const_iterator it =
- connection_buffer_length_metric_map_.find(instance_id);
+ ConnectionBufferLengthMetricMap::const_iterator it =
+ connection_buffer_length_metric_map_.find(instance_id);
- if ( it != connection_buffer_length_metric_map_.end() )
- connection_buffer_length_metric_map_
- [instance_id]->scope("packets")->incr();
-
+ if ( it != connection_buffer_length_metric_map_.end() )
+ connection_buffer_length_metric_map_
+ [instance_id]->scope("packets")->incr_by(_message->data().tuples_size());
+ }
stateful_gateway_->SendToInstance(_task_id, _message);
}
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
index a923657..0a3ecf1 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
@@ -141,6 +141,11 @@
out = __global_protobuf_pool_acquire__(out);
out->set_task_id(_task_id);
out->set_src_task_id(_msg.src_task_id());
+ sp_int32 length = 0;
+ if (_msg.has_data()) {
+ length = _msg.data().tuples_size();
+ }
+ out->set_num_tuples(length);
_msg.SerializePartialToString(out->mutable_set());
bool retval = clients_[_stmgr_id]->SendTupleStreamMessage(*out);