limit the mem pool size (#2011)
diff --git a/heron/common/src/cpp/basics/mempool.cpp b/heron/common/src/cpp/basics/mempool.cpp
index 0a0d16a..c7baad1 100644
--- a/heron/common/src/cpp/basics/mempool.cpp
+++ b/heron/common/src/cpp/basics/mempool.cpp
@@ -17,9 +17,9 @@
////////////////////////////////////////////////////////////////////////////////
// Just defines the static member of Mempool class
////////////////////////////////////////////////////////////////////////////////
-#include "basics/mempool.h"
#include "basics/basics.h"
+// TODO(nlu): get the pool size limit from config
MemPool<google::protobuf::Message>* __global_protobuf_pool__ =
- new MemPool<google::protobuf::Message>();
+ new MemPool<google::protobuf::Message>(50 * 1024 * 1024);
std::mutex __global_protobuf_pool_mutex__;
diff --git a/heron/common/src/cpp/basics/mempool.h b/heron/common/src/cpp/basics/mempool.h
index 3b5fd44..a98387e 100644
--- a/heron/common/src/cpp/basics/mempool.h
+++ b/heron/common/src/cpp/basics/mempool.h
@@ -21,34 +21,7 @@
#include <unordered_map>
#include <mutex>
#include <typeindex>
-
-template<typename T>
-class BaseMemPool {
- public:
- template<class... Args>
- T* acquire(Args&&... args) {
- if (pool_.empty()) {
- return new T(std::forward<Args>(args)...);
- }
- T* t = pool_.back();
- pool_.pop_back();
- return t;
- }
- void release(T* t) {
- pool_.push_back(t);
- }
- BaseMemPool() {
- }
- ~BaseMemPool() {
- for (auto& p : pool_) {
- delete p;
- }
- pool_.clear();
- }
- private:
- std::vector<T*> pool_;
-};
-
+#include "basics/basics.h"
template<typename B>
class MemPool {
@@ -56,21 +29,25 @@
MemPool() {
}
+ explicit MemPool(sp_int32 _pool_limit) :
+ pool_limit_(_pool_limit) {
+ }
+
// TODO(cwang): we have a memory leak here.
~MemPool() {
- for (auto& m : map_) {
- for (auto& n : m.second) {
- delete n;
+ for (auto& map_iter : mem_pool_map_) {
+ for (auto& mem_pool : map_iter.second) {
+ delete mem_pool;
}
- m.second.clear();
+ map_iter.second.clear();
}
- map_.clear();
+ mem_pool_map_.clear();
}
template<typename M>
M* acquire(M* m) {
std::type_index type = typeid(M);
- std::vector<B*>& pool = map_[type];
+ std::vector<B*>& pool = mem_pool_map_[type];
if (pool.empty()) {
return new M();
@@ -83,11 +60,21 @@
template<typename M>
void release(M* ptr) {
std::type_index type = typeid(M);
- map_[type].push_back(static_cast<B*>(ptr));
+ sp_int32 size = mem_pool_map_[type].size() * sizeof(M);
+ // if pool size reaches the limit, release the memory
+ // otherwise put the memory into pool
+ if (size >= pool_limit_) {
+ delete ptr;
+ } else {
+ mem_pool_map_[type].push_back(static_cast<B*>(ptr));
+ }
}
private:
- std::unordered_map<std::type_index, std::vector<B*>> map_;
+ // each type has its own separate mem pool entry
+ std::unordered_map<std::type_index, std::vector<B*>> mem_pool_map_;
+ // each mem pool size should not exceed the pool_limit_
+ sp_int32 pool_limit_;
};
extern MemPool<google::protobuf::Message>* __global_protobuf_pool__;
diff --git a/heron/common/src/cpp/network/client.h b/heron/common/src/cpp/network/client.h
index 2cd7de3..ae3c0d5 100644
--- a/heron/common/src/cpp/network/client.h
+++ b/heron/common/src/cpp/network/client.h
@@ -214,8 +214,8 @@
template <typename T, typename M>
void dispatchResponse(T* _t, void (T::*method)(void* _ctx, M*, NetworkErrorCode),
IncomingPacket* _ipkt, NetworkErrorCode _code) {
- void* ctx = NULL;
- M* m = NULL;
+ void* ctx = nullptr;
+ M* m = nullptr;
NetworkErrorCode status = _code;
if (status == OK && _ipkt) {
REQID rid;