Counting pending tuples (#2951)

* only using tuple num in tuplestream message to count number of data tuples

* adding a field to tuplestreammessage to hold number of tuples in message

* removing redundant code line
diff --git a/heron/proto/stmgr.proto b/heron/proto/stmgr.proto
index c5838a4..b403eff 100644
--- a/heron/proto/stmgr.proto
+++ b/heron/proto/stmgr.proto
@@ -67,4 +67,6 @@
   required int32 task_id = 1;
   // serialized data
   required bytes set = 2;
+  // number of tuples in serialized data tuples only
+  required int32 num_tuples = 4;
 }
diff --git a/heron/stmgr/src/cpp/manager/instance-server.cpp b/heron/stmgr/src/cpp/manager/instance-server.cpp
index 60480b7..5f8d9a2 100644
--- a/heron/stmgr/src/cpp/manager/instance-server.cpp
+++ b/heron/stmgr/src/cpp/manager/instance-server.cpp
@@ -74,7 +74,10 @@
 const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = "__time_spent_back_pressure_by_compid/";
 // Prefix for connection buffer's metrics
 const sp_string CONNECTION_BUFFER_BY_INSTANCEID = "__connection_buffer_by_instanceid/";
-// Prefix for connection buffer's length metrics
+// Prefix for connection buffer's length metrics. This is different
+// from METRIC_DATA_TUPLES_TO_INSTANCES as that counts
+// the tuples when they are sent to the instance -- this metric
+// will be used to count the tuples as they are received
 const sp_string CONNECTION_BUFFER_LENGTH_BY_INSTANCEID =
   "__connection_buffer_length_by_instanceid/";
 
@@ -396,6 +399,13 @@
 }
 
 void InstanceServer::SendToInstance2(proto::stmgr::TupleStreamMessage* _message) {
+  sp_string instance_id = task_id_to_name[_message->task_id()];
+  ConnectionBufferLengthMetricMap::const_iterator it =
+    connection_buffer_length_metric_map_.find(instance_id);
+  if ( it != connection_buffer_length_metric_map_.end() )
+    connection_buffer_length_metric_map_
+      [instance_id]->scope("packets")->incr_by(_message->num_tuples());
+
   stateful_gateway_->SendToInstance(_message);
 }
 
@@ -413,15 +423,16 @@
 
 void InstanceServer::SendToInstance2(sp_int32 _task_id,
                                   proto::system::HeronTupleSet2* _message) {
-  sp_string instance_id = task_id_to_name[_task_id];
+  if (_message->has_data()) {
+    sp_string instance_id = task_id_to_name[_task_id];
 
-  ConnectionBufferLengthMetricMap::const_iterator it =
-    connection_buffer_length_metric_map_.find(instance_id);
+    ConnectionBufferLengthMetricMap::const_iterator it =
+      connection_buffer_length_metric_map_.find(instance_id);
 
-  if ( it != connection_buffer_length_metric_map_.end() )
-    connection_buffer_length_metric_map_
-      [instance_id]->scope("packets")->incr();
-
+    if ( it != connection_buffer_length_metric_map_.end() )
+      connection_buffer_length_metric_map_
+        [instance_id]->scope("packets")->incr_by(_message->data().tuples_size());
+  }
   stateful_gateway_->SendToInstance(_task_id, _message);
 }
 
diff --git a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
index a923657..0a3ecf1 100644
--- a/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
+++ b/heron/stmgr/src/cpp/manager/stmgr-clientmgr.cpp
@@ -141,6 +141,11 @@
   out = __global_protobuf_pool_acquire__(out);
   out->set_task_id(_task_id);
   out->set_src_task_id(_msg.src_task_id());
+  sp_int32 length = 0;
+  if (_msg.has_data()) {
+    length = _msg.data().tuples_size();
+  }
+  out->set_num_tuples(length);
   _msg.SerializePartialToString(out->mutable_set());
 
   bool retval = clients_[_stmgr_id]->SendTupleStreamMessage(*out);