HAWQ-1767. Refactor FlatMemBuf memory allocate strategy
diff --git a/depends/dbcommon/src/dbcommon/function/agg-func.cc b/depends/dbcommon/src/dbcommon/function/agg-func.cc
index 48cfc96..ef07a19 100644
--- a/depends/dbcommon/src/dbcommon/function/agg-func.cc
+++ b/depends/dbcommon/src/dbcommon/function/agg-func.cc
@@ -92,7 +92,6 @@
const uint64_t *__restrict__ hashGroups =
DatumGetValue<const std::vector<uint64_t> *>(params[2])->data();
bool hasGroupBy = DatumGetValue<bool>(params[3]);
- Vector *vec = DatumGetValue<Vector *>(params[4]);
Accessor accessor = grpVals.getAccessor<Accessor>();
diff --git a/depends/dbcommon/src/dbcommon/log/logger.h b/depends/dbcommon/src/dbcommon/log/logger.h
index 5c8e72d..934ad30 100644
--- a/depends/dbcommon/src/dbcommon/log/logger.h
+++ b/depends/dbcommon/src/dbcommon/log/logger.h
@@ -77,7 +77,7 @@
#define LOG_INFO(...) COMPACT_LOG_INFO(__VA_ARGS__)
#define LOG_WARNING(...) COMPACT_LOG_WARNING(__VA_ARGS__)
#define LOG_ERROR(errCode, ...) COMPACT_LOG_ERROR(errCode, __VA_ARGS__)
-#define LOG_BACKSTRACE(...) COMPACT_LOG_BACKSTRACE(__VA_ARGS__)
+#define LOG_BACKTRACE(...) COMPACT_LOG_BACKTRACE(__VA_ARGS__)
#define LOG_NOT_RETRY_ERROR(errCode, ...) \
COMPACT_LOG_NOT_RETRY_ERROR(errCode, __VA_ARGS__)
#define LOG_FATAL(errCode, ...) COMPACT_LOG_FATAL(errCode, __VA_ARGS__)
@@ -101,7 +101,7 @@
throw dbcommon::TransactionAbortException(__msg, errCode, true); \
} while (0)
-#define COMPACT_LOG_BACKSTRACE(...) \
+#define COMPACT_LOG_BACKTRACE(...) \
do { \
std::string __msg = dbcommon::FormatErrorString(__VA_ARGS__); \
COMPACT_GOOGLE_LOG_ERROR.stream() \
diff --git a/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h b/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
index 739a266..8e64ace 100644
--- a/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
+++ b/depends/dbcommon/src/dbcommon/testutil/agg-func-utils.h
@@ -107,9 +107,9 @@
std::unique_ptr<AggGroupValues> generateAggGroupValues(
const std::vector<std::string> &vect, bool isAvg, bool isNotCountStar) {
std::unique_ptr<AggGroupValues> ret(new AggStringGroupValues);
+ ret->resize(vect.size());
auto grps = reinterpret_cast<AggStringGroupValues *>(ret.get());
auto accessor = grps->getAccessor<AggStringGroupValues ::Accessor>();
- ret->resize(vect.size());
for (int i = 0; i < vect.size(); i++) {
auto val = accessor.at(i);
if (!isNotCountStar)
@@ -125,9 +125,9 @@
std::vector<Timestamp> &vect, // NOLINT
bool isNotCountStar) {
std::unique_ptr<AggGroupValues> ret(new AggTimestampGroupValues);
+ ret->resize(vect.size());
auto grps = reinterpret_cast<AggTimestampGroupValues *>(ret.get());
auto accessor = grps->getAccessor<AggTimestampGroupValues::Accessor>();
- ret->resize(vect.size());
for (int i = 0; i < vect.size(); i++) {
Datum d = CreateDatum(vectStr[i].c_str(), &vect[i], TIMESTAMPID);
auto val = accessor.at(i);
@@ -165,6 +165,7 @@
auto grpVals = reinterpret_cast<AggGroupValues *>(grpValsBase);
auto accessor = grpVals->getAccessor<AggGroupValues::Accessor>();
DecimalType t;
+ EXPECT_EQ(expected.size(), grpVals->size());
for (int64_t i = 0; i < expected.size(); i++) {
EXPECT_EQ(expected[i],
std::stod(t.toString(accessor.at(i)->accVal.value)));
@@ -174,6 +175,7 @@
auto grpVals = reinterpret_cast<AggPrimitiveGroupValues *>(grpValsBase);
auto accessor = grpVals->getAccessor<AggPrimitiveGroupValues::Accessor>();
+ EXPECT_EQ(expected.size(), grpVals->size());
for (int64_t i = 0; i < expected.size(); i++) {
EXPECT_EQ(expected[i], DatumGetValue<T>(accessor.at(i)->accVal.value));
}
@@ -208,6 +210,7 @@
using AggGroupValues = AggDecimalGroupValues;
auto grpVals = reinterpret_cast<AggGroupValues *>(grpValsBase);
auto accessor = grpVals->getAccessor<AggGroupValues::Accessor>();
+ EXPECT_EQ(expected.size(), grpVals->size());
for (int64_t i = 0; i < expected.size(); i++) {
if (expected[i].count)
EXPECT_EQ(expected[i].sum,
@@ -218,6 +221,7 @@
}
auto grpVals = reinterpret_cast<AggPrimitiveGroupValues *>(grpValsBase);
auto accessor = grpVals->getAccessor<AggPrimitiveGroupValues::Accessor>();
+ EXPECT_EQ(expected.size(), grpVals->size());
for (int64_t i = 0; i < expected.size(); i++) {
EXPECT_EQ(expected[i].sum, accessor.at(i)->avgVal.sum);
EXPECT_EQ(expected[i].count, accessor.at(i)->avgVal.count);
diff --git a/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h b/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
index 249230f..df73629 100644
--- a/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
+++ b/depends/dbcommon/src/dbcommon/utils/flat-memory-buffer.h
@@ -26,6 +26,7 @@
#include <vector>
#include "dbcommon/utils/cutils.h"
+#include "dbcommon/utils/int-util.h"
#include "dbcommon/utils/macro.h"
namespace dbcommon {
@@ -75,27 +76,58 @@
static_assert((MEMBLKSIZE & (MEMBLKSIZE - 1)) == 0, "");
static_assert(((sizeof(TYPE) & (sizeof(TYPE) - 1)) == 0),
"supposed sizeof(TYPE) to be power of two");
- reserve(BlkSize);
- memBlkPtr0 = memBlkPtrListVec[0];
}
- force_inline void reserve(uint64_t size) {
- while ((size + BlkSize - 1) / BlkSize > memBlkList.size()) {
- // Exactly, each MemBlkOwner contains extra spare memory beyond the
- // specified MEMBLKSIZE, in order to align memory address into the
- // multiple of the cache line size, which helps to reduce cache miss when
- // size_ is small.
- // For example, the number of the groups in TPCH-Q1 is 4 and
- // sizeof(AggGroupValue) is 16. There is only 64 byte memory keeping being
- // accessed, which fits into one cache line perfectly.
- MemBlkOwner tmpOwner(MemBlkOwner(
- {(cnmalloc(MEMBLKSIZE + DEFAULT_CACHE_LINE_SIZE - 1)), cnfree}));
- char *tmpPtr =
- alignedAddressInto<DEFAULT_CACHE_LINE_SIZE>(tmpOwner.get());
- assert(((uint64_t)tmpPtr & (DEFAULT_CACHE_LINE_SIZE - 1)) == 0);
- memBlkPtrListVec.push_back(tmpPtr);
+ void reserve(uint64_t size) {
+ if (sizeof(TYPE) * size <= getMemUsed()) return;
+
+ auto filledBlockCount = sizeof(TYPE) * size / MEMBLKSIZE;
+ auto remainedMemSize = nextPowerOfTwo(sizeof(TYPE) * size % MEMBLKSIZE);
+ if (!memBlkList.empty() && filledBlockCount) {
+ if (filledBlockCount >= memBlkList.size() && lastBlkSize_ != MEMBLKSIZE) {
+ // enlarge previous block
+ auto blk = std::move(memBlkList.back());
+ memBlkPtrListVec.pop_back();
+ memBlkList.pop_back();
+ auto newBlk = allocate(blk.release(), MEMBLKSIZE);
+ memBlkPtrListVec.push_back(newBlk.first);
+ memBlkPtrList = &memBlkPtrListVec[0];
+ memBlkList.push_back(std::move(newBlk.second));
+ lastBlkSize_ = MEMBLKSIZE;
+ }
+ }
+ while (memBlkList.size() < filledBlockCount) {
+ auto newBlk = allocate(nullptr, MEMBLKSIZE);
+ memBlkPtrListVec.push_back(newBlk.first);
memBlkPtrList = &memBlkPtrListVec[0];
- memBlkList.push_back(std::move(tmpOwner));
+ memBlkList.push_back(std::move(newBlk.second));
+ }
+ assert(memBlkPtrListVec.size() == memBlkList.size());
+ assert(memBlkList.size() >= filledBlockCount);
+ if (memBlkList.size() == (filledBlockCount + (remainedMemSize ? 1 : 0))) {
+ if (remainedMemSize && remainedMemSize < MEMBLKSIZE) {
+ // enlarge previous block
+ auto blk = std::move(memBlkList.back());
+ memBlkPtrListVec.pop_back();
+ memBlkList.pop_back();
+ auto newBlk = allocate(blk.release(), remainedMemSize);
+ memBlkPtrListVec.push_back(newBlk.first);
+ memBlkPtrList = &memBlkPtrListVec[0];
+ memBlkList.push_back(std::move(newBlk.second));
+ lastBlkSize_ = remainedMemSize;
+ }
+ } else {
+ // allocate new block
+ assert(remainedMemSize);
+ auto newBlk = allocate(nullptr, remainedMemSize);
+ memBlkPtrListVec.push_back(newBlk.first);
+ memBlkPtrList = &memBlkPtrListVec[0];
+ memBlkList.push_back(std::move(newBlk.second));
+ lastBlkSize_ = remainedMemSize;
+ }
+
+ if (!memBlkPtrListVec.empty()) {
+ memBlkPtr0 = memBlkPtrListVec[0];
}
}
@@ -189,18 +221,48 @@
static const uint64_t BlkSize = MEMBLKSIZE / sizeof(TYPE);
- double getMemUsed() { return sizeof(TYPE) * size(); }
+ double getMemUsed() {
+ double ret = 0;
+ ret += memBlkList.size() > 1 ? MEMBLKSIZE * (memBlkList.size() - 1) : 0;
+ ret += lastBlkSize_;
+ assert(ret >= sizeof(TYPE) * size());
+ return ret;
+ }
+
+ void reset() {
+ lastBlkSize_ = 0;
+ memBlkList.clear();
+ memBlkPtrListVec.clear();
+ memBlkPtrList = nullptr;
+ memBlkPtr0 = nullptr;
+ size_ = 0;
+ }
private:
+ std::pair<char *, MemBlkOwner> allocate(char *ptr, size_t size) {
+ // Exactly, each MemBlkOwner contains extra spare memory beyond the
+ // specified MEMBLKSIZE, in order to align memory address into the
+ // multiple of the cache line size, which helps to reduce cache miss when
+ // size_ is small.
+ // For example, the number of the groups in TPCH-Q1 is 4 and
+ // sizeof(AggGroupValue) is 16. There is only 64 byte memory keeping being
+ // accessed, which fits into one cache line perfectly.
+ // TODO(chiyang): add back the alignment of cache line size
+ MemBlkOwner tmpOwner(MemBlkOwner({(cnrealloc(ptr, size)), cnfree}));
+ char *tmpPtr = tmpOwner.get();
+ return std::make_pair(tmpPtr, std::move(tmpOwner));
+ }
+
// the maximum number of the elements that contained in a memory block
static const uint64_t Mask = BlkSize - 1;
static const uint64_t ShiftLen = 64 - __builtin_clzll(Mask);
+ uint64_t lastBlkSize_ = 0;
std::vector<MemBlkOwner> memBlkList;
std::vector<char *> memBlkPtrListVec;
char **memBlkPtrList = nullptr;
char *memBlkPtr0 = nullptr; // performance: for small scale
- uint64_t size_ = 0;
+ uint64_t size_ = 0; // number of stored unit type
};
} // namespace dbcommon
diff --git a/depends/dbcommon/src/dbcommon/utils/int-util.h b/depends/dbcommon/src/dbcommon/utils/int-util.h
index ba7ce2e..f8b8ca1 100644
--- a/depends/dbcommon/src/dbcommon/utils/int-util.h
+++ b/depends/dbcommon/src/dbcommon/utils/int-util.h
@@ -29,8 +29,8 @@
return ((x != 0) && ((x & (~x + 1)) == x));
}
-inline uint64_t nextPowerOfTwo(uint64_t __n) {
- return uint64_t(1) << (64 - __builtin_clzll(__n - 1));
+inline uint64_t nextPowerOfTwo(uint64_t n) {
+ return n ? uint64_t(1) << (64 - __builtin_clzll(n - 1)) : 0;
}
// Encodes an unsigned variable-length integer using the MSB algorithm.
diff --git a/depends/dbcommon/src/dbcommon/utils/macro.h b/depends/dbcommon/src/dbcommon/utils/macro.h
index fba8f22..a104933 100644
--- a/depends/dbcommon/src/dbcommon/utils/macro.h
+++ b/depends/dbcommon/src/dbcommon/utils/macro.h
@@ -27,10 +27,10 @@
#define DEFAULT_NUMBER_TUPLES_PER_BATCH 2048
#define DEFAULT_RESERVED_SIZE_OF_STRING 16
#define DEFAULT_CACHE_LINE_SIZE 64
-#define DEFAULT_SIZE_PER_AGG_COUNTER_BLK (1 << 24)
-#define DEFAULT_SIZE_PER_HASH_CHAIN_BLK (1 << 27)
-#define DEFAULT_SIZE_PER_HASHKEY_BLK (1 << 26)
-#define DEFAULT_SIZE_PER_HASHJOIN_BLK (1 << 26)
+#define DEFAULT_SIZE_PER_AGG_COUNTER_BLK (128 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASH_CHAIN_BLK (128 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASHKEY_BLK (16 * 1024 * 1024)
+#define DEFAULT_SIZE_PER_HASHJOIN_BLK (16 * 1024 * 1024)
#define INNER_VAR 65000 // reference to inner subplan
#define OUTER_VAR 65001 // reference to outer subplan
diff --git a/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc b/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
index ac03bde..435f39f 100644
--- a/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
+++ b/depends/dbcommon/test/unit/function/test-agg-func-has-no-group-by.cc
@@ -32,7 +32,7 @@
std::vector<int64_t> initAggGrpVals = {3};
std::unique_ptr<AggGroupValues> grpValsBase =
generateAggGroupValues<int64_t>(initAggGrpVals, false, false);
- std::vector<uint64_t> hashGroups = {0, 1};
+ std::vector<uint64_t> hashGroups = {0, 0};
params[0] = CreateDatum<AggGroupValues *>(grpValsBase.get());
params[2] = CreateDatum<const std::vector<uint64_t> *>(&hashGroups);
@@ -482,14 +482,14 @@
std::unique_ptr<Vector> vec;
{ // without vec but with scalar
std::unique_ptr<Scalar> scalar = generateScalar<T>(TK, 1);
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
checkAcc<T>({1}, grpVals.get());
}
{
LOG_INFO("Testing with seleletList");
- std::vector<uint64_t> hashGroups = {0, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0};
SelectList sel = {0, 3, 4};
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
LOG_INFO("Testing has nulls");
@@ -520,7 +520,7 @@
{
LOG_INFO("Testing without seleletList");
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
LOG_INFO("Testing has nulls");
vec = VectorUtility::generateSelectVector<T>(TK, vals, &nulls, nullptr);
@@ -592,14 +592,14 @@
{ // without vec but with scalar
LOG_INFO("Testing scalar");
std::unique_ptr<Scalar> scalar = generateScalar<T>("1");
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
checkAccOnString({"1"}, grpVals.get());
}
{
LOG_INFO("Testing with seleletList");
- std::vector<uint64_t> hashGroups = {0, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0};
SelectList sel = {0, 3, 4};
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
LOG_INFO("Testing has nulls");
@@ -633,7 +633,7 @@
{
LOG_INFO("Testing without seleletList");
grpVals = generateAggGroupValues<T>(initGrpVals, false, true);
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
LOG_INFO("Testing has nulls");
vec = VectorUtility::generateSelectVector<T>(STRINGID, vals, &nulls,
@@ -685,14 +685,14 @@
Timestamp tsScalar;
std::unique_ptr<Scalar> scalar =
generateScalar<T>("2018-01-19 19:52:00", &tsScalar);
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
callFuncScalar(testFunc, grpVals.get(), &hashGroups, true, scalar.get());
checkAccOnTimestamp({{1516391520, 0}}, grpVals.get());
}
{
LOG_INFO("Testing with selectList");
- std::vector<uint64_t> hashGroups = {0, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0};
SelectList sel = {0, 3, 4};
grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
LOG_INFO("Testing has nulls");
@@ -726,7 +726,7 @@
{
LOG_INFO("Testing without selectList");
grpVals = generateAggGroupValues<T>(initGrpValStrs, initGrpVals, true);
- std::vector<uint64_t> hashGroups = {0, 0, 1, 1, 1};
+ std::vector<uint64_t> hashGroups = {0, 0, 0, 0, 0};
LOG_INFO("Testing has nulls");
vec = VectorUtility::generateSelectTimestampVector(
diff --git a/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc b/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc
new file mode 100644
index 0000000..23dcd06
--- /dev/null
+++ b/depends/dbcommon/test/unit/utils/test-flat-memory-buffer.cc
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "dbcommon/utils/flat-memory-buffer.h"
+
+#include "gtest/gtest.h"
+
+namespace dbcommon {
+
+TEST(TestFlatMemBuf, TestResize) {
+ using Type = double;
+ FlatMemBuf<Type, 256> buf;
+
+ ASSERT_EQ(0, buf.getMemUsed());
+
+ buf.resize(5);
+ EXPECT_EQ(nextPowerOfTwo(sizeof(Type) * 5), buf.getMemUsed());
+ *buf.ptrAt(3) = 233;
+ EXPECT_EQ(233, *buf.ptrAt(3));
+
+ buf.resize(9);
+ EXPECT_EQ(nextPowerOfTwo(sizeof(Type) * 9), buf.getMemUsed());
+
+ buf.resize(buf.BlkSize + 7);
+ EXPECT_EQ(256 + nextPowerOfTwo(sizeof(Type) * 7), buf.getMemUsed());
+ *buf.ptrAt(buf.BlkSize) = 666;
+ EXPECT_EQ(666, *buf.ptrAt(buf.BlkSize));
+
+ buf.resize(buf.BlkSize * 13);
+ EXPECT_EQ(256 * 13, buf.getMemUsed());
+
+ EXPECT_EQ(233, *buf.ptrAt(3));
+ EXPECT_EQ(666, *buf.ptrAt(buf.BlkSize));
+
+ buf.resize(2);
+ EXPECT_EQ(256 * 13, buf.getMemUsed());
+}
+
+} // namespace dbcommon
+