blob: 5b55732c43d23791c257bfe4529622a254b25a3e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/parquet/hdfs-parquet-scanner.h"
#include "runtime/test-env.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
#include "common/names.h"
static const int64_t MIN_BUFFER_SIZE = 64 * 1024;
static const int64_t MAX_BUFFER_SIZE = 8 * 1024 * 1024;
DECLARE_int64(min_buffer_size);
DECLARE_int32(read_size);
namespace impala {
class HdfsParquetScannerTest : public testing::Test {
public:
virtual void SetUp() {
// Override min/max buffer sizes picked up by DiskIoMgr.
FLAGS_min_buffer_size = MIN_BUFFER_SIZE;
FLAGS_read_size = MAX_BUFFER_SIZE;
test_env_.reset(new TestEnv);
ASSERT_OK(test_env_->Init());
}
virtual void TearDown() {
test_env_.reset();
}
protected:
void TestComputeIdealReservation(const vector<int64_t>& col_range_lengths,
int64_t expected_ideal_reservation);
void TestDivideReservation(const vector<int64_t>& col_range_lengths,
int64_t total_col_reservation, const vector<int64_t>& expected_reservations);
boost::scoped_ptr<TestEnv> test_env_;
};
/// Test the ComputeIdealReservation returns 'expected_ideal_reservation' for a list
/// of columns with 'col_range_lengths'.
void HdfsParquetScannerTest::TestComputeIdealReservation(
const vector<int64_t>& col_range_lengths, int64_t expected_ideal_reservation) {
EXPECT_EQ(expected_ideal_reservation,
HdfsParquetScanner::ComputeIdealReservation(col_range_lengths));
}
TEST_F(HdfsParquetScannerTest, ComputeIdealReservation) {
// Should round up to nearest power-of-two buffer size if < max scan range buffer.
TestComputeIdealReservation({0}, MIN_BUFFER_SIZE);
TestComputeIdealReservation({1}, MIN_BUFFER_SIZE);
TestComputeIdealReservation({MIN_BUFFER_SIZE - 1}, MIN_BUFFER_SIZE);
TestComputeIdealReservation({MIN_BUFFER_SIZE}, MIN_BUFFER_SIZE);
TestComputeIdealReservation({MIN_BUFFER_SIZE + 2}, 2 * MIN_BUFFER_SIZE);
TestComputeIdealReservation({4 * MIN_BUFFER_SIZE + 1234}, 8 * MIN_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE - 10}, MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE}, MAX_BUFFER_SIZE);
// Should round to nearest max I/O buffer size if >= max scan range buffer, up to 3
// buffers.
TestComputeIdealReservation({MAX_BUFFER_SIZE + 1}, 2 * MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 - 1}, 2 * MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE * 2}, 2 * MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE * 2 + 1}, 3 * MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE * 3 + 1}, 3 * MAX_BUFFER_SIZE);
TestComputeIdealReservation({MAX_BUFFER_SIZE * 100 + 27}, 3 * MAX_BUFFER_SIZE);
// Ideal reservations from multiple ranges are simply added together.
TestComputeIdealReservation({1, 2}, 2 * MIN_BUFFER_SIZE);
TestComputeIdealReservation(
{MAX_BUFFER_SIZE, MAX_BUFFER_SIZE - 1}, 2 * MAX_BUFFER_SIZE);
TestComputeIdealReservation(
{MAX_BUFFER_SIZE, MIN_BUFFER_SIZE + 1}, MAX_BUFFER_SIZE + 2 * MIN_BUFFER_SIZE);
TestComputeIdealReservation(
{MAX_BUFFER_SIZE, MAX_BUFFER_SIZE * 128}, 4 * MAX_BUFFER_SIZE);
TestComputeIdealReservation(
{MAX_BUFFER_SIZE * 7, MAX_BUFFER_SIZE * 128, MAX_BUFFER_SIZE * 1000},
3L * 3L * MAX_BUFFER_SIZE);
// Test col size that doesn't fit in int32.
TestComputeIdealReservation({MAX_BUFFER_SIZE * 1024L}, 3L * MAX_BUFFER_SIZE);
// Test sum of reservations that doesn't fit in int32.
vector<int64_t> col_range_lengths;
const int64_t LARGE_NUM_RANGES = 10000;
for (int i = 0; i < LARGE_NUM_RANGES; ++i) {
col_range_lengths.push_back(4 * MAX_BUFFER_SIZE);
}
TestComputeIdealReservation(col_range_lengths, LARGE_NUM_RANGES * 3L * MAX_BUFFER_SIZE);
}
/// Test that DivideReservationBetweenColumns() returns 'expected_reservations' for
/// inputs 'col_range_lengths' and 'total_col_reservation'.
void HdfsParquetScannerTest::TestDivideReservation(
const vector<int64_t>& col_range_lengths, int64_t total_col_reservation,
const vector<int64_t>& expected_reservations) {
vector<pair<int, int64_t>> reservations =
HdfsParquetScanner::DivideReservationBetweenColumnsHelper(
MIN_BUFFER_SIZE, MAX_BUFFER_SIZE, col_range_lengths, total_col_reservation);
for (int i = 0; i < reservations.size(); ++i) {
LOG(INFO) << i << " " << reservations[i].first << " " << reservations[i].second;
}
EXPECT_EQ(reservations.size(), expected_reservations.size());
vector<bool> present(expected_reservations.size(), false);
for (auto& reservation: reservations) {
// Ensure that each appears exactly once.
EXPECT_FALSE(present[reservation.first]);
present[reservation.first] = true;
EXPECT_EQ(expected_reservations[reservation.first], reservation.second)
<< reservation.first;
}
}
TEST_F(HdfsParquetScannerTest, DivideReservation) {
// Test a long scan ranges with lots of memory - should allocate 3 max-size
// buffers per range.
TestDivideReservation({100 * 1024 * 1024}, 50 * 1024 * 1024, {3 * MAX_BUFFER_SIZE});
TestDivideReservation({100 * 1024 * 1024, 50 * 1024 * 1024}, 100 * 1024 * 1024,
{3 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
// Long scan ranges, not enough memory for 3 buffers each. Should only allocate
// max-sized buffers, preferring the longer scan range.
TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 5 * MAX_BUFFER_SIZE,
{2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024},
5 * MAX_BUFFER_SIZE + MIN_BUFFER_SIZE,
{2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
TestDivideReservation({50 * 1024 * 1024, 100 * 1024 * 1024}, 6 * MAX_BUFFER_SIZE - 1,
{2 * MAX_BUFFER_SIZE, 3 * MAX_BUFFER_SIZE});
// Test a short range with lots of memory - should round up buffer size.
TestDivideReservation({100 * 1024}, 50 * 1024 * 1024, {128 * 1024});
// Test a range << MIN_BUFFER_SIZE - should round up to buffer size.
TestDivideReservation({13}, 50 * 1024 * 1024, {MIN_BUFFER_SIZE});
// Test long ranges with limited memory.
TestDivideReservation({100 * 1024 * 1024}, 100 * 1024, {MIN_BUFFER_SIZE});
TestDivideReservation({100 * 1024 * 1024}, MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE});
TestDivideReservation({100 * 1024 * 1024}, 2 * MIN_BUFFER_SIZE, {2 * MIN_BUFFER_SIZE});
TestDivideReservation({100 * 1024 * 1024}, MAX_BUFFER_SIZE - 1, {MAX_BUFFER_SIZE / 2});
TestDivideReservation({100 * 1024 * 1024, 1024 * 1024, MIN_BUFFER_SIZE},
3 * MIN_BUFFER_SIZE, {MIN_BUFFER_SIZE, MIN_BUFFER_SIZE, MIN_BUFFER_SIZE});
// Test a mix of scan range lengths larger than and smaller than the max I/O buffer
// size. Long ranges get allocated most memory.
TestDivideReservation(
{15145047, 5019635, 5019263, 15145047, 15145047, 5019635, 5019263, 317304},
25165824,
{8388608, 2097152, 524288, 8388608, 4194304, 1048576, 262144, 262144});
}
}