blob: c6dc81a757df4603c57b7ec8cf093edf29144d41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "compute/ResultIterator.h"
#include "memory/VeloxColumnarBatch.h"
#include "operators/plannodes/IteratorSplit.h"
#include "velox/connectors/Connector.h"
#include "velox/exec/Driver.h"
#include "velox/exec/Operator.h"
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/Task.h"
#include "velox/type/Filter.h"
#include "velox/vector/DecodedVector.h"
namespace gluten {
class RowVectorStream {
public:
virtual ~RowVectorStream() = default;
explicit RowVectorStream(
facebook::velox::memory::MemoryPool* pool,
std::shared_ptr<ResultIterator> iterator,
const facebook::velox::RowTypePtr& outputType)
: pool_(pool), outputType_(outputType), iterator_(iterator) {}
bool hasNext();
// Convert arrow batch to row vector, construct the new Rowvector with new outputType.
virtual facebook::velox::RowVectorPtr next();
protected:
// Get the next batch from iterator_.
std::shared_ptr<ColumnarBatch> nextInternal();
facebook::velox::memory::MemoryPool* pool_;
const facebook::velox::RowTypePtr outputType_;
std::shared_ptr<ResultIterator> iterator_;
bool finished_{false};
};
/// DataSource implementation that reads from ResultIterator instances.
/// This allows iterator-based data to be consumed via Velox's standard
/// connector/split mechanism, enabling proper integration with Task::addSplit().
class ValueStreamDataSource : public facebook::velox::connector::DataSource {
public:
ValueStreamDataSource(
const facebook::velox::RowTypePtr& outputType,
const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
const facebook::velox::connector::ColumnHandleMap& columnHandles,
facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx);
void addSplit(std::shared_ptr<facebook::velox::connector::ConnectorSplit> split) override;
std::optional<facebook::velox::RowVectorPtr> next(uint64_t size, facebook::velox::ContinueFuture& future) override;
const facebook::velox::common::SubfieldFilters* getFilters() const override {
return &emptyFilters_;
}
void addDynamicFilter(
facebook::velox::column_index_t outputChannel,
const std::shared_ptr<facebook::velox::common::Filter>& filter) override {
if (dynamicFilterEnabled_) {
dynamicFilters_[outputChannel] = filter;
numDynamicFiltersAccepted_++;
}
}
uint64_t getCompletedBytes() override {
return completedBytes_;
}
uint64_t getCompletedRows() override {
return completedRows_;
}
std::unordered_map<std::string, facebook::velox::RuntimeMetric> getRuntimeStats() override {
std::unordered_map<std::string, facebook::velox::RuntimeMetric> stats;
stats["dynamicFiltersAccepted"] = facebook::velox::RuntimeMetric(numDynamicFiltersAccepted_);
if (dynamicFilterInputRows_ > 0) {
stats["dynamicFilterInputRows"] = facebook::velox::RuntimeMetric(dynamicFilterInputRows_);
}
return stats;
}
private:
// Applies dynamic filters to a batch, returning a dictionary-wrapped subset
// containing only the rows that pass all filters.
facebook::velox::RowVectorPtr applyDynamicFilters(const facebook::velox::RowVectorPtr& input);
// Evaluates a Filter against a single column vector, deselecting rows that
// don't pass.
static void applyFilterOnColumn(
const std::shared_ptr<facebook::velox::common::Filter>& filter,
const facebook::velox::VectorPtr& vector,
facebook::velox::SelectivityVector& rows);
const facebook::velox::RowTypePtr outputType_;
facebook::velox::memory::MemoryPool* pool_;
std::vector<std::shared_ptr<RowVectorStream>> pendingIterators_;
std::shared_ptr<RowVectorStream> currentIterator_{nullptr};
uint64_t completedBytes_{0};
uint64_t completedRows_{0};
folly::F14FastMap<facebook::velox::column_index_t, std::shared_ptr<facebook::velox::common::Filter>> dynamicFilters_;
const facebook::velox::common::SubfieldFilters emptyFilters_;
bool dynamicFilterEnabled_{true};
uint64_t numDynamicFiltersAccepted_{0};
uint64_t dynamicFilterInputRows_{0};
};
/// Table handle for iterator-based scans
class ValueStreamTableHandle : public facebook::velox::connector::ConnectorTableHandle {
public:
explicit ValueStreamTableHandle(std::string connectorId, bool dynamicFilterEnabled = true)
: ConnectorTableHandle(connectorId), dynamicFilterEnabled_(dynamicFilterEnabled) {}
const std::string& name() const override {
static const std::string kName = "ValueStreamTableHandle";
return kName;
}
bool dynamicFilterEnabled() const {
return dynamicFilterEnabled_;
}
folly::dynamic serialize() const override {
VELOX_NYI();
}
private:
bool dynamicFilterEnabled_;
};
/// Column handle for iterator-based scans
class ValueStreamColumnHandle : public facebook::velox::connector::ColumnHandle {
public:
ValueStreamColumnHandle(std::string name, facebook::velox::TypePtr type)
: name_(std::move(name)), type_(std::move(type)) {}
const std::string& name() const {
return name_;
}
const facebook::velox::TypePtr& type() const {
return type_;
}
private:
std::string name_;
facebook::velox::TypePtr type_;
};
/// Connector implementation for iterator-based data sources
class ValueStreamConnector : public facebook::velox::connector::Connector {
public:
ValueStreamConnector(
const std::string& id,
std::shared_ptr<const facebook::velox::config::ConfigBase> config,
bool dynamicFilterEnabled = false)
: Connector(id, config), dynamicFilterEnabled_(dynamicFilterEnabled) {}
bool canAddDynamicFilter() const override {
return dynamicFilterEnabled_;
}
std::unique_ptr<facebook::velox::connector::DataSource> createDataSource(
const facebook::velox::RowTypePtr& outputType,
const facebook::velox::connector::ConnectorTableHandlePtr& tableHandle,
const facebook::velox::connector::ColumnHandleMap& columnHandles,
facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx) override {
return std::make_unique<ValueStreamDataSource>(outputType, tableHandle, columnHandles, connectorQueryCtx);
}
std::unique_ptr<facebook::velox::connector::DataSink> createDataSink(
facebook::velox::RowTypePtr inputType,
facebook::velox::connector::ConnectorInsertTableHandlePtr connectorInsertTableHandle,
facebook::velox::connector::ConnectorQueryCtx* connectorQueryCtx,
facebook::velox::connector::CommitStrategy commitStrategy) override {
VELOX_UNSUPPORTED("ValueStreamConnector does not support data sinks");
}
private:
bool dynamicFilterEnabled_;
};
/// Factory for creating ValueStreamConnector instances
class ValueStreamConnectorFactory : public facebook::velox::connector::ConnectorFactory {
public:
static constexpr const char* kValueStreamConnectorName = "value-stream";
static std::string nodeIdOf(int32_t streamIdx) {
return fmt::format("{}:{}", kValueStreamConnectorName, streamIdx);
}
ValueStreamConnectorFactory() : ConnectorFactory(kValueStreamConnectorName) {}
std::shared_ptr<facebook::velox::connector::Connector> newConnector(
const std::string& id,
std::shared_ptr<const facebook::velox::config::ConfigBase> config,
folly::Executor* ioExecutor = nullptr,
folly::Executor* cpuExecutor = nullptr) override {
return std::make_shared<ValueStreamConnector>(id, config);
}
};
} // namespace gluten