blob: 9eb65475fb4c20705c647e5c95d0dd9cc7b1af7c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <vector>
#include "common/compiler-util.h"
#include "common/object-pool.h"
#include "experiments/data-provider.h"
#include "runtime/mem-pool.h"
#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
#include "util/cpu-info.h"
#include "util/runtime-profile-counters.h"
#define PRINT_RESULT 0
#define UNUSED_BITS 16
#define USED_BITS 48
#define POINTER_LOWER_BITS_MASK 0x0000FFFFFFFFFFFF
#include "common/names.h"
using namespace impala;
inline void Memcpy16(uint8_t* dst, uint8_t* src) {
reinterpret_cast<uint64_t*>(dst)[0] = reinterpret_cast<uint64_t*>(src)[0];
reinterpret_cast<uint64_t*>(dst)[1] = reinterpret_cast<uint64_t*>(src)[1];
}
struct PointerValue {
union {
uint64_t val;
uint8_t* ptr;
};
template<typename T>
T GetPointer() {
return reinterpret_cast<T>(val & POINTER_LOWER_BITS_MASK);
}
uint16_t GetValue() const {
return static_cast<uint16_t>(val >> USED_BITS);
}
template<typename T>
void Get(T& t, uint16_t& v) {
t = reinterpret_cast<T>(val & POINTER_LOWER_BITS_MASK);
v = static_cast<uint16_t>(val >> USED_BITS);
}
void UpdatePointer(void* p) {
val = val >> USED_BITS;
val = val << USED_BITS;
val = val | reinterpret_cast<uint64_t>(p);
}
void UpdateValue(uint16_t v) {
val = val << UNUSED_BITS;
val = val >> UNUSED_BITS;
val = val | (static_cast<uint64_t>(v) << USED_BITS);
}
void Update(void* p, int16_t v) {
val = reinterpret_cast<uint64_t>(p);
val = val | (static_cast<uint64_t>(v) << USED_BITS);
}
};
template <class T>
T NextPowerOfTwo(T k) {
k--;
for (int i = 1; i < sizeof(T) * CHAR_BIT; i <<=1) k = k | k >> i;
return k+1;
}
class DataPartitioner {
public:
DataPartitioner(MemPool* pool, RuntimeProfile* profile, int size, int hash_offset) {
pool_ = pool;
profile_ = profile;
size_ = size;
hash_offset_ = hash_offset;
bytes_allocated_ = ADD_COUNTER(profile, "BytesAllocated", TUnit::BYTES);
bytes_copied_ = ADD_COUNTER(profile, "BytesCopied", TUnit::BYTES);
add_time_ = ADD_COUNTER(profile, "AddTime", TUnit::CPU_TICKS);
split_time_ = ADD_COUNTER(profile, "SplitTime", TUnit::CPU_TICKS);
splits_ = ADD_COUNTER(profile, "Splits", TUnit::UNIT);
// Max of tuple size and cache line
int min_size_per_partition = max(size, 64);
partitions_per_level_ = NextPowerOfTwo(L1_size / min_size_per_partition) >> 1;
tuples_per_partition_ = L1_size / size;
printf("Tuple Size: %d\n", size_);
printf("Partitions: %d Per Partition: %d\n", partitions_per_level_, tuples_per_partition_);
partition_idx_mask_ = partitions_per_level_ - 1;
build_partitions_.resize(partitions_per_level_);
child_partitions_.resize(partitions_per_level_);
for (int i = 0; i < partitions_per_level_; ++i) {
build_partitions_[i].level = 0;
}
partitions_at_level_.resize(6);
partitions_at_level_[0] = partitions_per_level_;
partitions_at_level_[1] = partitions_per_level_;
partitions_at_level_[2] = partitions_per_level_;
partitions_at_level_[3] = partitions_per_level_;
partitions_at_level_[4] = partitions_per_level_;
partitions_at_level_[5] = partitions_per_level_;
split_counts_.resize(partitions_per_level_);
outputs_.resize(partitions_per_level_);
for (int i = 0; i < outputs_.size(); ++i) {
outputs_[i].Update(NULL, tuples_per_partition_);
}
}
void AddData(int n, uint8_t* data);
int size_per_block() const {
return tuples_per_partition_;
}
struct Partition {
vector<uint8_t*> blocks;
int num_last_block;
Partition() {
num_last_block = 0;
}
Partition(const vector<uint8_t*>& blocks, int num) {
this->blocks = blocks;
this->num_last_block = num;
}
};
int NumTuples(const Partition& partition) const {
return partition.num_last_block + (partition.blocks.size() - 1) * tuples_per_partition_;
}
int NumTuples(const Partition& partition, int block_idx) const {
if (block_idx == partition.blocks.size() - 1) return partition.num_last_block;
return tuples_per_partition_;
}
bool Finalize(vector<Partition>* results) {
SCOPED_TIMER(split_time_);
bool result = true;
for (int i = 0; i < build_partitions_.size(); ++i) {
if (build_partitions_[i].blocks.size() > 0) {
BuildPartition& build_partition = build_partitions_[i];
build_partition.num_last_block = NumLastBlock(i);
}
}
for (int i = 0; i < build_partitions_.size(); ++i) {
if (build_partitions_[i].blocks.size() > 0) {
BuildPartition& build_partition = build_partitions_[i];
if (TotalTuples(build_partition) > tuples_per_partition_) {
if (Split(build_partition)) {
continue;
} else {
result = false;
}
}
results->push_back(ToOutputPartition(build_partition));
}
}
return result;
}
private:
static const int L1_size = 24 * 1024;
static const int MIN_SPLITS = 4;
static const int HASH_BIT_SHIFT = 4;
MemPool* pool_;
RuntimeProfile* profile_;
RuntimeProfile::Counter* bytes_allocated_;
RuntimeProfile::Counter* bytes_copied_;
RuntimeProfile::Counter* split_time_;
RuntimeProfile::Counter* add_time_;
RuntimeProfile::Counter* splits_;
int size_;
int hash_offset_;
int tuples_per_partition_;
int partitions_per_level_;
int partition_idx_mask_;
vector<PointerValue> outputs_;
vector<int32_t> split_counts_;
vector<int> partitions_at_level_;
struct BuildPartition {
vector<uint8_t*> blocks;
int num_last_block;
int level;
};
vector<BuildPartition> build_partitions_;
vector<BuildPartition> child_partitions_;
uint8_t* Allocate(BuildPartition* partition, int p, int size = 0) {
if (size == 0) size = tuples_per_partition_ * size_;
else size = size * size_;
COUNTER_ADD(bytes_allocated_, size);
uint8_t* out_mem = pool_->Allocate(size);
partition->blocks.push_back(out_mem);
outputs_[p].Update(out_mem, 0);
return out_mem;
}
Partition ToOutputPartition(const BuildPartition& build) const {
return Partition(build.blocks, build.num_last_block);
}
int TotalTuples(const BuildPartition& partition) const {
return partition.num_last_block + (partition.blocks.size() - 1) * tuples_per_partition_;
}
int NumLastBlock(int p) const {
return outputs_[p].GetValue();
}
int NumTuples(const BuildPartition& partition, int block_idx) const {
if (block_idx == partition.blocks.size() - 1) return partition.num_last_block;
return tuples_per_partition_;
}
bool Split(const BuildPartition& build_partition);
};
void DataPartitioner::AddData(int n, uint8_t* data) {
SCOPED_TIMER(add_time_);
COUNTER_ADD(bytes_copied_, n * size_);
for (int i = 0; i < n; ++i) {
int32_t hash = *reinterpret_cast<int32_t*>(data + hash_offset_);
hash &= partition_idx_mask_;
uint8_t* out_mem = outputs_[hash].GetPointer<uint8_t*>();
uint16_t size = outputs_[hash].GetValue();
if (size == tuples_per_partition_) {
out_mem = Allocate(&build_partitions_[hash], hash);
size = 0;
}
Memcpy16(out_mem, data);
data += size_;
outputs_[hash].Update(out_mem + size_, size + 1);
}
}
bool DataPartitioner::Split(const BuildPartition& build_partition) {
COUNTER_ADD(splits_, 1);
const int num_blocks = build_partition.blocks.size();
const int next_level = build_partition.level + 1;
if (next_level >= 8) {
CHECK(false) << "TOO MANY LEVELS: " << next_level;
return false;
}
int partitions = partitions_at_level_[next_level];
int partition_mask = partitions - 1;
int new_splits = 0;
memset(split_counts_.data(), 0, sizeof(split_counts_[0]) * split_counts_.size());
for (int i = 0; i < num_blocks; ++i) {
int tuples = NumTuples(build_partition, i);
uint8_t* hash_slot = build_partition.blocks[i] + hash_offset_;
for (int j = 0; j < tuples; ++j) {
int32_t hash = *reinterpret_cast<int32_t*>(hash_slot);
hash >>= HASH_BIT_SHIFT;
*reinterpret_cast<int32_t*>(hash_slot) = hash;
hash &= partition_mask;
new_splits += !split_counts_[hash];
split_counts_[hash]++;
hash_slot += size_;
}
}
if (new_splits < MIN_SPLITS) return false;
for (int i = 0; i < partitions; ++i) {
if (split_counts_[i] > 0) {
COUNTER_ADD(bytes_allocated_, split_counts_[i] * size_);
uint8_t* mem = pool_->Allocate(split_counts_[i] * size_);
child_partitions_[i].blocks.clear();
child_partitions_[i].level = next_level;
child_partitions_[i].blocks.push_back(mem);
outputs_[i].ptr = mem;
}
}
COUNTER_ADD(bytes_copied_, TotalTuples(build_partition) * size_);
for (int i = 0; i < num_blocks; ++i) {
uint8_t* data = build_partition.blocks[i];
int tuples = NumTuples(build_partition, i);
const int offset1 = 0;
const int offset2 = offset1 + size_;
const int offset3 = offset2 + size_;
const int offset4 = offset3 + size_;
while (tuples >= 4) {
int32_t hash1 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset1) & partition_mask;
int32_t hash2 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset2) & partition_mask;
int32_t hash3 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset3) & partition_mask;
int32_t hash4 = *reinterpret_cast<int32_t*>(data + hash_offset_ + offset4) & partition_mask;
uint8_t** out_mem1 = &outputs_[hash1].ptr;
uint8_t** out_mem2 = &outputs_[hash2].ptr;
uint8_t** out_mem3 = &outputs_[hash3].ptr;
uint8_t** out_mem4 = &outputs_[hash4].ptr;
Memcpy16(*out_mem1, data + offset1);
*out_mem1 += size_;
Memcpy16(*out_mem2, data + offset2);
*out_mem2 += size_;
Memcpy16(*out_mem3, data + offset3);
*out_mem3 += size_;
Memcpy16(*out_mem4, data + offset4);
*out_mem4 += size_;
data += size_ * 4;
tuples -= 4;
}
for (int j = 0; j < tuples; ++j) {
int32_t hash = *reinterpret_cast<int32_t*>(data + hash_offset_) & partition_mask;
uint8_t* out_mem = outputs_[hash].ptr;
Memcpy16(out_mem, data);
data += size_;
outputs_[hash].ptr = out_mem + size_;
}
}
for (int i = 0; i < partitions; ++i) {
if (split_counts_[i] > 0) {
BuildPartition& child = child_partitions_[i];
child.num_last_block = split_counts_[i];
build_partitions_.push_back(child);
}
}
return true;
}
int main(int argc, char **argv) {
google::InitGoogleLogging(argv[0]);
LOG(ERROR) << "Starting Test";
vector<DataProvider::ColDesc> cols;
// Hash Slot
cols.push_back(DataProvider::ColDesc::Create<int32_t>(0, 0));
// Grouping Column
cols.push_back(DataProvider::ColDesc::Create<int64_t>(0, 500000));
// Aggregate Column
cols.push_back(DataProvider::ColDesc::Create<float>(-1, 1));
MemTracker tracker;
MemPool pool(&tracker);
ObjectPool obj_pool;
RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool, "PartitioningTest");
DataProvider provider(&pool, profile);
provider.Reset(50*1024*1024, 1024, cols);
//provider.Reset(100*1024, 1024, cols);
//provider.Reset(100, 1024, cols);
DataPartitioner partitioner(&pool, profile, provider.row_size(), 0);
cout << "Begin processing: " << provider.total_rows() << endl;
int rows;
void* data;
while ( (data = provider.NextBatch(&rows)) != NULL) {
uint8_t* next_row = reinterpret_cast<uint8_t*>(data);
// Compute hash values and store them
for (int i = 0; i < rows; ++i) {
int32_t* hash = reinterpret_cast<int32_t*>(next_row);
int64_t* col = reinterpret_cast<int64_t*>(next_row + 4);
*hash = *col;
next_row += provider.row_size();
}
partitioner.AddData(rows, reinterpret_cast<uint8_t*>(data));
}
cout << endl;
LOG(ERROR) << "Partitioning";
vector<DataPartitioner::Partition> partitions;
bool fully_split = partitioner.Finalize(&partitions);
cout << endl << "After Partitioning" << endl;
int result_tuples = 0;
int blocks = 0;
int largest_partition = 0;
// Validate results
for (int i = 0; i < partitions.size(); ++i) {
DataPartitioner::Partition& partition = partitions[i];
int tuples = partitioner.NumTuples(partition);
if (largest_partition < tuples) largest_partition = tuples;
result_tuples += tuples;
blocks += partition.blocks.size();
#if PRINT_RESULT
for (int j = 0; j < partition.blocks.size(); ++j) {
int rows = partitioner.NumTuples(partition, j);
provider.Print(&cout, partition.blocks[j], rows);
cout << "------------------------------" << endl;
}
#endif
}
cout << endl;
cout << "Fully Split: " << (fully_split ? "yes" : "no") << endl;
cout << "Partitioned tuples: " << result_tuples << endl;
cout << "Num Blocks: " << blocks << endl;
cout << "Num Partitions: " << partitions.size() << endl;
cout << "Largest Partition: " << largest_partition << endl;;
cout << endl;
profile->PrettyPrint(&cout);
LOG(ERROR) << "Done.";
return 0;
}