HAWQ-1767. Refactor FlatMemBuf memory allocate strategy
diff --git a/depends/dbcommon/src/dbcommon/function/agg-func.cc b/depends/dbcommon/src/dbcommon/function/agg-func.cc
index 48cfc96..ef07a19 100644
--- a/depends/dbcommon/src/dbcommon/function/agg-func.cc
+++ b/depends/dbcommon/src/dbcommon/function/agg-func.cc
@@ -92,7 +92,6 @@
   const uint64_t *__restrict__ hashGroups =
       DatumGetValue<const std::vector<uint64_t> *>(params[2])->data();
   bool hasGroupBy = DatumGetValue<bool>(params[3]);
-  Vector *vec = DatumGetValue<Vector *>(params[4]);
 
   Accessor accessor = grpVals.getAccessor<Accessor>();
 
diff --git a/depends/dbcommon/src/dbcommon/log/logger.h b/depends/dbcommon/src/dbcommon/log/logger.h
index 5c8e72d..934ad30 100644
--- a/depends/dbcommon/src/dbcommon/log/logger.h
+++ b/depends/dbcommon/src/dbcommon/log/logger.h
@@ -77,7 +77,7 @@
 #define LOG_INFO(...) COMPACT_LOG_INFO(__VA_ARGS__)
 #define LOG_WARNING(...) COMPACT_LOG_WARNING(__VA_ARGS__)
 #define LOG_ERROR(errCode, ...) COMPACT_LOG_ERROR(errCode, __VA_ARGS__)
-#define LOG_BACKSTRACE(...) COMPACT_LOG_BACKSTRACE(__VA_ARGS__)
+#define LOG_BACKTRACE(...) COMPACT_LOG_BACKTRACE(__VA_ARGS__)
 #define LOG_NOT_RETRY_ERROR(errCode, ...) \
   COMPACT_LOG_NOT_RETRY_ERROR(errCode, __VA_ARGS__)
 #define LOG_FATAL(errCode, ...) COMPACT_LOG_FATAL(errCode, __VA_ARGS__)
@@ -101,7 +101,7 @@
     throw dbcommon::TransactionAbortException(__msg, errCode, true);         \
   } while (0)
 
-#define COMPACT_LOG_BACKSTRACE(...)                               \
+#define COMPACT_LOG_BACKTRACE(...)                               \
   do {                                                            \
     std::string __msg = dbcommon::FormatErrorString(__VA_ARGS__); \
     COMPACT_GOOGLE_LOG_ERROR.stream()                             \
diff --git a/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h b/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
index 739a266..8e64ace 100644
--- a/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
+++ b/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
@@ -107,9 +107,9 @@
   std::unique_ptr<AggGroupValues> generateAggGroupValues(
       const std::vector<std::string> &vect, bool isAvg, bool isNotCountStar) {
     std::unique_ptr<AggGroupValues> ret(new AggStringGroupValues);
+    ret->resize(vect.size());
     auto grps = reinterpret_cast<AggStringGroupValues *>(ret.get());
     auto accessor = grps->getAccessor<AggStringGroupValues ::Accessor>();
-    ret->resize(vect.size());
     for (int i = 0; i < vect.size(); i++) {
       auto val = accessor.at(i);
       if (!isNotCountStar)
@@ -125,9 +125,9 @@
       std::vector<Timestamp> &vect,             // NOLINT
       bool isNotCountStar) {
     std::unique_ptr<AggGroupValues> ret(new AggTimestampGroupValues);
+    ret->resize(vect.size());
     auto grps = reinterpret_cast<AggTimestampGroupValues *>(ret.get());
     auto accessor = grps->getAccessor<AggTimestampGroupValues::Accessor>();
-    ret->resize(vect.size());
     for (int i = 0; i < vect.size(); i++) {
       Datum d = CreateDatum(vectStr[i].c_str(), &vect[i], TIMESTAMPID);
       auto val = accessor.at(i);
@@ -165,6 +165,7 @@
       auto grpVals = reinterpret_cast<AggGroupValues *>(grpValsBase);
       auto accessor = grpVals->getAccessor<AggGroupValues::Accessor>();
       DecimalType t;
+      EXPECT_EQ(expected.size(), grpVals->size());
       for (int64_t i = 0; i < expected.size(); i++) {
         EXPECT_EQ(expected[i],
                   std::stod(t.toString(accessor.at(i)->accVal.value)));
@@ -174,6 +175,7 @@
 
     auto grpVals = reinterpret_cast<AggPrimitiveGroupValues *>(grpValsBase);
     auto accessor = grpVals->getAccessor<AggPrimitiveGroupValues::Accessor>();
+    EXPECT_EQ(expected.size(), grpVals->size());
     for (int64_t i = 0; i < expected.size(); i++) {
       EXPECT_EQ(expected[i], DatumGetValue<T>(accessor.at(i)->accVal.value));
     }
@@ -208,6 +210,7 @@
       using AggGroupValues = AggDecimalGroupValues;
       auto grpVals = reinterpret_cast<AggGroupValues *>(grpValsBase);
       auto accessor = grpVals->getAccessor<AggGroupValues::Accessor>();
+      EXPECT_EQ(expected.size(), grpVals->size());
       for (int64_t i = 0; i < expected.size(); i++) {
         if (expected[i].count)
           EXPECT_EQ(expected[i].sum,
@@ -218,6 +221,7 @@
     }
     auto grpVals = reinterpret_cast<AggPrimitiveGroupValues *>(grpValsBase);
     auto accessor = grpVals->getAccessor<AggPrimitiveGroupValues::Accessor>();
+    EXPECT_EQ(expected.size(), grpVals->size());
     for (int64_t i = 0; i < expected.size(); i++) {
       EXPECT_EQ(expected[i].sum, accessor.at(i)->avgVal.sum);
       EXPECT_EQ(expected[i].count, accessor.at(i)->avgVal.count);
diff --git a/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h b/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
index 249230f..df73629 100644
--- a/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
+++ b/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
@@ -26,6 +26,7 @@
 #include <vector>
 
 #include "dbcommon/utils/cutils.h"
+#include "dbcommon/utils/int-util.h"
 #include "dbcommon/utils/macro.h"
 
 namespace dbcommon {
@@ -75,27 +76,58 @@
     static_assert((MEMBLKSIZE & (MEMBLKSIZE - 1)) == 0, "");
     static_assert(((sizeof(TYPE) & (sizeof(TYPE) - 1)) == 0),
                   "supposed sizeof(TYPE) to be power of two");
-    reserve(BlkSize);
-    memBlkPtr0 = memBlkPtrListVec[0];
   }
 
-  force_inline void reserve(uint64_t size) {
-    while ((size + BlkSize - 1) / BlkSize > memBlkList.size()) {
-      // Exactly, each MemBlkOwner contains extra spare memory beyond the
-      // specified MEMBLKSIZE, in order to align memory address into the
-      // multiple of the cache line size, which helps to reduce cache miss when
-      // size_ is small.
-      // For example, the number of the groups in TPCH-Q1 is 4 and
-      // sizeof(AggGroupValue) is 16. There is only 64 byte memory keeping being
-      // accessed, which fits into one cache line perfectly.
-      MemBlkOwner tmpOwner(MemBlkOwner(
-          {(cnmalloc(MEMBLKSIZE + DEFAULT_CACHE_LINE_SIZE - 1)), cnfree}));
-      char *tmpPtr =
-          alignedAddressInto<DEFAULT_CACHE_LINE_SIZE>(tmpOwner.get());
-      assert(((uint64_t)tmpPtr & (DEFAULT_CACHE_LINE_SIZE - 1)) == 0);
-      memBlkPtrListVec.push_back(tmpPtr);
+  void reserve(uint64_t size) {
+    if (sizeof(TYPE) * size <= getMemUsed()) return;
+
+    auto filledBlockCount = sizeof(TYPE) * size / MEMBLKSIZE;
+    auto remainedMemSize = nextPowerOfTwo(sizeof(TYPE) * size % MEMBLKSIZE);
+    if (!memBlkList.empty() && filledBlockCount) {
+      if (filledBlockCount >= memBlkList.size() && lastBlkSize_ != MEMBLKSIZE) {
+        // enlarge previous block
+        auto blk = std::move(memBlkList.back());
+        memBlkPtrListVec.pop_back();
+        memBlkList.pop_back();
+        auto newBlk = allocate(blk.release(), MEMBLKSIZE);
+        memBlkPtrListVec.push_back(newBlk.first);
+        memBlkPtrList = &memBlkPtrListVec[0];
+        memBlkList.push_back(std::move(newBlk.second));
+        lastBlkSize_ = MEMBLKSIZE;
+      }
+    }
+    while (memBlkList.size() < filledBlockCount) {
+      auto newBlk = allocate(nullptr, MEMBLKSIZE);
+      memBlkPtrListVec.push_back(newBlk.first);
       memBlkPtrList = &memBlkPtrListVec[0];
-      memBlkList.push_back(std::move(tmpOwner));
+      memBlkList.push_back(std::move(newBlk.second));
+    }
+    assert(memBlkPtrListVec.size() == memBlkList.size());
+    assert(memBlkList.size() >= filledBlockCount);
+    if (memBlkList.size() == (filledBlockCount + (remainedMemSize ? 1 : 0))) {
+      if (remainedMemSize && remainedMemSize < MEMBLKSIZE) {
+        // enlarge previous block
+        auto blk = std::move(memBlkList.back());
+        memBlkPtrListVec.pop_back();
+        memBlkList.pop_back();
+        auto newBlk = allocate(blk.release(), remainedMemSize);
+        memBlkPtrListVec.push_back(newBlk.first);
+        memBlkPtrList = &memBlkPtrListVec[0];
+        memBlkList.push_back(std::move(newBlk.second));
+        lastBlkSize_ = remainedMemSize;
+      }
+    } else {
+      // allocate new block
+      assert(remainedMemSize);
+      auto newBlk = allocate(nullptr, remainedMemSize);
+      memBlkPtrListVec.push_back(newBlk.first);
+      memBlkPtrList = &memBlkPtrListVec[0];
+      memBlkList.push_back(std::move(newBlk.second));
+      lastBlkSize_ = remainedMemSize;
+    }
+
+    if (!memBlkPtrListVec.empty()) {
+      memBlkPtr0 = memBlkPtrListVec[0];
     }
   }
 
@@ -189,18 +221,48 @@
 
   static const uint64_t BlkSize = MEMBLKSIZE / sizeof(TYPE);
 
-  double getMemUsed() { return sizeof(TYPE) * size(); }
+  double getMemUsed() {
+    double ret = 0;
+    ret += memBlkList.size() > 1 ? MEMBLKSIZE * (memBlkList.size() - 1) : 0;
+    ret += lastBlkSize_;
+    assert(ret >= sizeof(TYPE) * size());
+    return ret;
+  }
+
+  void reset() {
+    lastBlkSize_ = 0;
+    memBlkList.clear();
+    memBlkPtrListVec.clear();
+    memBlkPtrList = nullptr;
+    memBlkPtr0 = nullptr;
+    size_ = 0;
+  }
 
  private:
+  std::pair<char *, MemBlkOwner> allocate(char *ptr, size_t size) {
+    // Exactly, each MemBlkOwner contains extra spare memory beyond the
+    // specified MEMBLKSIZE, in order to align memory address into the
+    // multiple of the cache line size, which helps to reduce cache miss when
+    // size_ is small.
+    // For example, the number of the groups in TPCH-Q1 is 4 and
+    // sizeof(AggGroupValue) is 16. There is only 64 byte memory keeping being
+    // accessed, which fits into one cache line perfectly.
+    // TODO(chiyang): add back the alignment of cache line size
+    MemBlkOwner tmpOwner(MemBlkOwner({(cnrealloc(ptr, size)), cnfree}));
+    char *tmpPtr = tmpOwner.get();
+    return std::make_pair(tmpPtr, std::move(tmpOwner));
+  }
+
   // the maximum number of the elements that contained in a memory block
   static const uint64_t Mask = BlkSize - 1;
   static const uint64_t ShiftLen = 64 - __builtin_clzll(Mask);
 
+  uint64_t lastBlkSize_ = 0;
   std::vector<MemBlkOwner> memBlkList;
   std::vector<char *> memBlkPtrListVec;
   char **memBlkPtrList = nullptr;
   char *memBlkPtr0 = nullptr;  // performance: for small scale
-  uint64_t size_ = 0;
+  uint64_t size_ = 0;          // number of stored unit type
 };
 
 }  // namespace dbcommon
diff --git a/depends/dbcommon/src/dbcommon/utils/int-util.h b/depends/dbcommon/src/dbcommon/utils/int-util.h
index ba7ce2e..f8b8ca1 100644
--- a/depends/dbcommon/src/dbcommon/utils/int-util.h
+++ b/depends/dbcommon/src/dbcommon/utils/int-util.h
@@ -29,8 +29,8 @@
   return ((x != 0) && ((x & (~x + 1)) == x));
 }
 
-inline uint64_t nextPowerOfTwo(uint64_t __n) {
-  return uint64_t(1) << (64 - __builtin_clzll(__n - 1));
+inline uint64_t nextPowerOfTwo(uint64_t n) {
+  return n ? uint64_t(1) << (64 - __builtin_clzll(n - 1)) : 0;
 }
 
 // Encodes an unsigned variable-length integer using the MSB algorithm.
diff --git a/depends/dbcommon/src/dbcommon/utils/macro.h b/depends/dbcommon/src/dbcommon/utils/macro.h
index fba8f22..a104933 100644
--- a/depends/dbcommon/src/dbcommon/utils/macro.h
+++ b/depends/dbcommon/src/dbcommon/utils/macro.h
@@ -27,10 +27,10 @@
 #define DEFAULT_NUMBER_TUPLES_PER_BATCH 2048
 #define DEFAULT_RESERVED_SIZE_OF_STRING 16
 #define DEFAULT_CACHE_LINE_SIZE 64
-#define DEFAULT_SIZE_PER_AGG_COUNTER_BLK (1 << 24)
-#define DEFAULT_SIZE_PER_HASH_CHAIN_BLK (1 << 27)
-#define DEFAULT_SIZE_PER_HASHKEY_BLK (1 << 26)
-#define DEFAULT_SIZE_PER_HASHJOIN_BLK (1 << 26)
+#define DEFAULT_SIZE_PER_AGG_COUNTER_BLK (128 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASH_CHAIN_BLK (128 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASHKEY_BLK (16 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASHJOIN_BLK (16 * 1024 * 1024)
 
 #define INNER_VAR 65000  // reference to inner subplan
 #define OUTER_VAR 65001  // reference to outer subplan
diff --git a/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc b/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
index ac03bde..435f39f 100644
--- a/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
+++ b/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
@@ -32,7 +32,7 @@
     std::vector<int64_t> initAggGrpVals = {3};
     std::unique_ptr<AggGroupValues> grpValsBase =
         generateAggGroupValues<int64_t>(initAggGrpVals, false, false);
-    std::vector<uint64_t> hashGroups = {0, 1};
+    std::vector<uint64_t> hashGroups = {0, 0};
 
     params[0] = CreateDatum<AggGroupValues *>(grpValsBase.get());
     params[2] = CreateDatum<const std::vector<uint64_t> *>(&hashGroups);
@@ -482,14 +482,14 @@
     std::unique_ptr<Vector> vec;
     {  // without vec but with scalar
       std::unique_ptr<Scalar> scalar = generateScalar<T>(TK, 1);
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
       callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
       checkAcc<T>({1}, grpVals.get());
     }
     {
       LOG_INFO("Testing with seleletList");
-      std::vector<uint64_t> hashGroups = {0, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0};
       SelectList sel = {0, 3, 4};
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
       LOG_INFO("Testing has nulls");
@@ -520,7 +520,7 @@
     {
       LOG_INFO("Testing without seleletList");
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
 
       LOG_INFO("Testing has nulls");
       vec = VectorUtility::generateSelectVector<T>(TK, vals, &nulls, nullptr);
@@ -592,14 +592,14 @@
     {  // without vec but with scalar
       LOG_INFO("Testing scalar");
       std::unique_ptr<Scalar> scalar = generateScalar<T>("1");
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
       callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
       checkAccOnString({"1"}, grpVals.get());
     }
     {
       LOG_INFO("Testing with seleletList");
-      std::vector<uint64_t> hashGroups = {0, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0};
       SelectList sel = {0, 3, 4};
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
       LOG_INFO("Testing has nulls");
@@ -633,7 +633,7 @@
     {
       LOG_INFO("Testing without seleletList");
       grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
 
       LOG_INFO("Testing has nulls");
       vec = VectorUtility::generateSelectVector<T>(STRINGID, vals, &nulls,
@@ -685,14 +685,14 @@
       Timestamp tsScalar;
       std::unique_ptr<Scalar> scalar =
           generateScalar<T>("2018-01-19 19:52:00", &tsScalar);
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
       grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
       callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
       checkAccOnTimestamp({{1516391520, 0}}, grpVals.get());
     }
     {
       LOG_INFO("Testing with selectList");
-      std::vector<uint64_t> hashGroups = {0, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0};
       SelectList sel = {0, 3, 4};
       grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
       LOG_INFO("Testing has nulls");
@@ -726,7 +726,7 @@
     {
       LOG_INFO("Testing without selectList");
       grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
-      std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+      std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
 
       LOG_INFO("Testing has nulls");
       vec = VectorUtility::generateSelectTimestampVector(
diff --git a/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc b/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc
new file mode 100644
index 0000000..23dcd06
--- /dev/null
+++ b/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dbcommon/utils/flat-memory-buffer.h"
+
+#include "gtest/gtest.h"
+
+namespace dbcommon {
+
+TEST(TestFlatMemBuf, TestResize) {
+  using Type = double;
+  FlatMemBuf<Type, 256> buf;
+
+  ASSERT_EQ(0, buf.getMemUsed());
+
+  buf.resize(5);
+  EXPECT_EQ(nextPowerOfTwo(sizeof(Type) * 5), buf.getMemUsed());
+  *buf.ptrAt(3) = 233;
+  EXPECT_EQ(233, *buf.ptrAt(3));
+
+  buf.resize(9);
+  EXPECT_EQ(nextPowerOfTwo(sizeof(Type) * 9), buf.getMemUsed());
+
+  buf.resize(buf.BlkSize + 7);
+  EXPECT_EQ(256 + nextPowerOfTwo(sizeof(Type) * 7), buf.getMemUsed());
+  *buf.ptrAt(buf.BlkSize) = 666;
+  EXPECT_EQ(666, *buf.ptrAt(buf.BlkSize));
+
+  buf.resize(buf.BlkSize * 13);
+  EXPECT_EQ(256 * 13, buf.getMemUsed());
+
+  EXPECT_EQ(233, *buf.ptrAt(3));
+  EXPECT_EQ(666, *buf.ptrAt(buf.BlkSize));
+
+  buf.resize(2);
+  EXPECT_EQ(256 * 13, buf.getMemUsed());
+}
+
+}  // namespace dbcommon
+