blob: 6e13bfed01454b74f9135f5bf468c45ce405f169 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gtest/gtest.h>
#include "memory/VeloxColumnarBatch.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/type/Filter.h"
#include "velox/vector/DecodedVector.h"
#include "velox/vector/FlatVector.h"
#include "velox/vector/tests/utils/VectorTestBase.h"
using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::common;
namespace facebook::velox::test {
/// A ColumnarBatchIterator that yields pre-built RowVectors as VeloxColumnarBatches.
class TestBatchIterator final : public gluten::ColumnarBatchIterator {
public:
explicit TestBatchIterator(std::vector<RowVectorPtr> batches) : batches_(std::move(batches)) {}
std::shared_ptr<gluten::ColumnarBatch> next() override {
if (idx_ >= batches_.size()) {
return nullptr;
}
return std::make_shared<gluten::VeloxColumnarBatch>(batches_[idx_++]);
}
private:
std::vector<RowVectorPtr> batches_;
size_t idx_ = 0;
};
class ValueStreamDynamicFilterTest : public ::testing::Test, public VectorTestBase {
protected:
static void SetUpTestCase() {
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
}
void SetUp() override {
// Register the connector if not already registered.
if (!connector::hasConnector(gluten::kIteratorConnectorId)) {
auto config = std::make_shared<config::ConfigBase>(
std::unordered_map<std::string, std::string>());
connector::registerConnector(std::make_shared<gluten::ValueStreamConnector>(
gluten::kIteratorConnectorId, config, /*dynamicFilterEnabled=*/true));
}
}
/// Build a TableScanNode that reads from the value-stream connector.
std::shared_ptr<core::TableScanNode> makeTableScanNode(
const std::string& nodeId,
const RowTypePtr& outputType) {
auto tableHandle =
std::make_shared<gluten::ValueStreamTableHandle>(gluten::kIteratorConnectorId);
connector::ColumnHandleMap assignments;
for (int idx = 0; idx < outputType->size(); idx++) {
auto name = outputType->nameOf(idx);
auto type = outputType->childAt(idx);
assignments[name] =
std::make_shared<gluten::ValueStreamColumnHandle>(name, type);
}
return std::make_shared<core::TableScanNode>(
nodeId, outputType, tableHandle, assignments);
}
/// Create a split wrapping the given batches.
std::shared_ptr<connector::ConnectorSplit> makeSplit(
std::vector<RowVectorPtr> batches) {
auto iter = std::make_shared<gluten::ResultIterator>(
std::make_unique<TestBatchIterator>(std::move(batches)));
return std::make_shared<gluten::IteratorConnectorSplit>(
gluten::kIteratorConnectorId, std::move(iter));
}
/// Read all int64 values from column 0 of a serial-mode task.
std::vector<int64_t> readAllInt64(Task* task) {
std::vector<int64_t> result;
ContinueFuture future = ContinueFuture::makeEmpty();
while (true) {
auto batch = task->next(&future);
if (!batch) {
break;
}
DecodedVector decoded(*batch->childAt(0));
for (vector_size_t i = 0; i < batch->size(); i++) {
result.push_back(decoded.valueAt<int64_t>(i));
}
}
return result;
}
};
// Test that without any filter, all rows pass through.
TEST_F(ValueStreamDynamicFilterTest, noFilterPassesAllRows) {
auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20, 30})});
auto outputType = asRowType(batch->type());
auto scanNode = makeTableScanNode("vs0", outputType);
auto queryCtx = core::QueryCtx::create();
auto task = Task::create(
"test-nofilter",
core::PlanFragment{scanNode},
0,
queryCtx,
Task::ExecutionMode::kSerial);
task->addSplit(scanNode->id(), Split{makeSplit({batch})});
task->noMoreSplits(scanNode->id());
auto ids = readAllInt64(task.get());
ASSERT_EQ(ids, (std::vector<int64_t>{10, 20, 30}));
}
// Test that filtering works when filter is injected after first batch.
TEST_F(ValueStreamDynamicFilterTest, filterBigintRange) {
auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3, 4, 5})});
auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({6, 7, 8, 9, 10})});
auto outputType = asRowType(batch1->type());
auto scanNode = makeTableScanNode("vs1", outputType);
auto queryCtx = core::QueryCtx::create();
auto task = Task::create(
"test-bigint",
core::PlanFragment{scanNode},
0,
queryCtx,
Task::ExecutionMode::kSerial);
// Add both batches as a single split.
task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
task->noMoreSplits(scanNode->id());
// First next() creates drivers and returns first batch (unfiltered).
ContinueFuture future = ContinueFuture::makeEmpty();
auto firstBatch = task->next(&future);
ASSERT_NE(firstBatch, nullptr);
ASSERT_EQ(firstBatch->size(), 5);
// Inject a BigintRange filter: keep only id >= 8.
task->testingVisitDrivers([&](Driver* driver) {
auto* op = driver->findOperator(scanNode->id());
if (!op) {
return;
}
ASSERT_TRUE(op->canAddDynamicFilter());
PushdownFilters pf;
pf.filters[0] = std::make_shared<BigintRange>(8, 10, false);
pf.dynamicFilteredColumns.insert(0);
op->addDynamicFilterLocked("producer", pf);
});
// Second next() should return filtered batch.
auto secondBatch = task->next(&future);
ASSERT_NE(secondBatch, nullptr);
DecodedVector decoded(*secondBatch->childAt(0));
std::vector<int64_t> outputIds;
for (vector_size_t i = 0; i < secondBatch->size(); i++) {
outputIds.push_back(decoded.valueAt<int64_t>(i));
}
ASSERT_EQ(outputIds, (std::vector<int64_t>{8, 9, 10}));
auto end = task->next(&future);
ASSERT_EQ(end, nullptr);
}
// Test that a filter eliminates all rows from a batch.
TEST_F(ValueStreamDynamicFilterTest, filterEliminatesEntireBatch) {
auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({1, 2, 3})});
auto batch2 = makeRowVector({"id"}, {makeFlatVector<int64_t>({100, 200, 300})});
auto outputType = asRowType(batch1->type());
auto scanNode = makeTableScanNode("vs2", outputType);
auto queryCtx = core::QueryCtx::create();
auto task = Task::create(
"test-eliminate",
core::PlanFragment{scanNode},
0,
queryCtx,
Task::ExecutionMode::kSerial);
task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
task->noMoreSplits(scanNode->id());
ContinueFuture future = ContinueFuture::makeEmpty();
auto firstBatch = task->next(&future);
ASSERT_NE(firstBatch, nullptr);
ASSERT_EQ(firstBatch->size(), 3);
task->testingVisitDrivers([&](Driver* driver) {
auto* op = driver->findOperator(scanNode->id());
if (!op) {
return;
}
PushdownFilters pf;
pf.filters[0] = std::make_shared<BigintRange>(100, 300, false);
pf.dynamicFilteredColumns.insert(0);
op->addDynamicFilterLocked("producer", pf);
});
auto secondBatch = task->next(&future);
ASSERT_NE(secondBatch, nullptr);
DecodedVector decoded(*secondBatch->childAt(0));
std::vector<int64_t> outputIds;
for (vector_size_t i = 0; i < secondBatch->size(); i++) {
outputIds.push_back(decoded.valueAt<int64_t>(i));
}
ASSERT_EQ(outputIds, (std::vector<int64_t>{100, 200, 300}));
auto end = task->next(&future);
ASSERT_EQ(end, nullptr);
}
// Test that nulls are filtered out when nullAllowed is false.
TEST_F(ValueStreamDynamicFilterTest, filterWithNulls) {
auto batch1 = makeRowVector({"id"}, {makeFlatVector<int64_t>({10, 20})});
auto batch2 = makeRowVector(
{"id"},
{makeNullableFlatVector<int64_t>({1, std::nullopt, 3, std::nullopt, 5})});
auto outputType = asRowType(batch1->type());
auto scanNode = makeTableScanNode("vs3", outputType);
auto queryCtx = core::QueryCtx::create();
auto task = Task::create(
"test-nulls",
core::PlanFragment{scanNode},
0,
queryCtx,
Task::ExecutionMode::kSerial);
task->addSplit(scanNode->id(), Split{makeSplit({batch1, batch2})});
task->noMoreSplits(scanNode->id());
ContinueFuture future = ContinueFuture::makeEmpty();
auto firstBatch = task->next(&future);
ASSERT_NE(firstBatch, nullptr);
task->testingVisitDrivers([&](Driver* driver) {
auto* op = driver->findOperator(scanNode->id());
if (!op) {
return;
}
PushdownFilters pf;
pf.filters[0] = std::make_shared<BigintRange>(3, 100, false);
pf.dynamicFilteredColumns.insert(0);
op->addDynamicFilterLocked("producer", pf);
});
auto secondBatch = task->next(&future);
ASSERT_NE(secondBatch, nullptr);
DecodedVector decoded(*secondBatch->childAt(0));
std::vector<int64_t> outputIds;
for (vector_size_t i = 0; i < secondBatch->size(); i++) {
ASSERT_FALSE(decoded.isNullAt(i));
outputIds.push_back(decoded.valueAt<int64_t>(i));
}
ASSERT_EQ(outputIds, (std::vector<int64_t>{3, 5}));
auto end = task->next(&future);
ASSERT_EQ(end, nullptr);
}
// Test canAddDynamicFilter returns true through the connector.
TEST_F(ValueStreamDynamicFilterTest, canAddDynamicFilter) {
auto batch = makeRowVector({"id"}, {makeFlatVector<int64_t>({1})});
auto outputType = asRowType(batch->type());
auto scanNode = makeTableScanNode("vs4", outputType);
auto queryCtx = core::QueryCtx::create();
auto task = Task::create(
"test-can-add",
core::PlanFragment{scanNode},
0,
queryCtx,
Task::ExecutionMode::kSerial);
task->addSplit(scanNode->id(), Split{makeSplit({batch})});
task->noMoreSplits(scanNode->id());
ContinueFuture future = ContinueFuture::makeEmpty();
task->next(&future);
bool found = false;
task->testingVisitDrivers([&](Driver* driver) {
auto* op = driver->findOperator(scanNode->id());
if (op) {
ASSERT_TRUE(op->canAddDynamicFilter());
found = true;
}
});
ASSERT_TRUE(found);
auto end = task->next(&future);
ASSERT_EQ(end, nullptr);
}
} // namespace facebook::velox::test