limit the mem pool size (#2011)

diff --git a/heron/common/src/cpp/basics/mempool.cpp b/heron/common/src/cpp/basics/mempool.cpp
index 0a0d16a..c7baad1 100644
--- a/heron/common/src/cpp/basics/mempool.cpp
+++ b/heron/common/src/cpp/basics/mempool.cpp
@@ -17,9 +17,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 // Just defines the static member of Mempool class
 ////////////////////////////////////////////////////////////////////////////////
-#include "basics/mempool.h"
 #include "basics/basics.h"
 
+// TODO(nlu): get the pool size limit from config
 MemPool<google::protobuf::Message>* __global_protobuf_pool__ =
-                                   new MemPool<google::protobuf::Message>();
+                                   new MemPool<google::protobuf::Message>(50 * 1024 * 1024);
 std::mutex __global_protobuf_pool_mutex__;
diff --git a/heron/common/src/cpp/basics/mempool.h b/heron/common/src/cpp/basics/mempool.h
index 3b5fd44..a98387e 100644
--- a/heron/common/src/cpp/basics/mempool.h
+++ b/heron/common/src/cpp/basics/mempool.h
@@ -21,34 +21,7 @@
 #include <unordered_map>
 #include <mutex>
 #include <typeindex>
-
-template<typename T>
-class BaseMemPool {
- public:
-  template<class... Args>
-  T* acquire(Args&&... args) {
-    if (pool_.empty()) {
-      return new T(std::forward<Args>(args)...);
-    }
-    T* t = pool_.back();
-    pool_.pop_back();
-    return t;
-  }
-  void release(T* t) {
-    pool_.push_back(t);
-  }
-  BaseMemPool() {
-  }
-  ~BaseMemPool() {
-      for (auto& p : pool_) {
-        delete p;
-      }
-      pool_.clear();
-  }
- private:
-  std::vector<T*> pool_;
-};
-
+#include "basics/basics.h"
 
 template<typename B>
 class MemPool {
@@ -56,21 +29,25 @@
   MemPool() {
   }
 
+  explicit MemPool(sp_int32 _pool_limit) :
+    pool_limit_(_pool_limit) {
+  }
+
   // TODO(cwang): we have a memory leak here.
   ~MemPool() {
-    for (auto& m : map_) {
-      for (auto& n : m.second) {
-        delete n;
+    for (auto& map_iter : mem_pool_map_) {
+      for (auto& mem_pool : map_iter.second) {
+        delete mem_pool;
       }
-      m.second.clear();
+      map_iter.second.clear();
     }
-    map_.clear();
+    mem_pool_map_.clear();
   }
 
   template<typename M>
   M* acquire(M* m) {
     std::type_index type = typeid(M);
-    std::vector<B*>& pool = map_[type];
+    std::vector<B*>& pool = mem_pool_map_[type];
 
     if (pool.empty()) {
       return new M();
@@ -83,11 +60,21 @@
   template<typename M>
   void release(M* ptr) {
     std::type_index type = typeid(M);
-    map_[type].push_back(static_cast<B*>(ptr));
+    sp_int32 size = mem_pool_map_[type].size() * sizeof(M);
+    // if pool size reaches the limit, release the memory
+    // otherwise put the memory into pool
+    if (size >= pool_limit_) {
+      delete ptr;
+    } else {
+      mem_pool_map_[type].push_back(static_cast<B*>(ptr));
+    }
   }
 
  private:
-  std::unordered_map<std::type_index, std::vector<B*>> map_;
+  // each type has its own separate mem pool entry
+  std::unordered_map<std::type_index, std::vector<B*>> mem_pool_map_;
+  // each mem pool size should not exceed the pool_limit_
+  sp_int32 pool_limit_;
 };
 
 extern MemPool<google::protobuf::Message>* __global_protobuf_pool__;
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 2cd7de3..ae3c0d5 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -214,8 +214,8 @@
   template <typename T, typename M>
   void dispatchResponse(T* _t, void (T::*method)(void* _ctx, M*, NetworkErrorCode),
                         IncomingPacket* _ipkt, NetworkErrorCode _code) {
-    void* ctx = NULL;
-    M* m = NULL;
+    void* ctx = nullptr;
+    M* m = nullptr;
     NetworkErrorCode status = _code;
     if (status == OK && _ipkt) {
       REQID rid;