blob: 37f4da33439b0efed0c777001471ee4aaa918844 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include "WholeStageResultIterator.h"
#include "compute/Runtime.h"
#include "compute/VeloxConnectorIds.h"
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "iceberg/IcebergWriter.h"
#endif
#include <folly/Executor.h>
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "operators/serializer/VeloxColumnarToRowConverter.h"
#include "operators/writer/VeloxParquetDataSource.h"
#include "shuffle/ShuffleReader.h"
#include "shuffle/ShuffleWriter.h"
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
#include "IcebergNestedField.pb.h"
#endif
namespace gluten {
class VeloxRuntime final : public Runtime {
public:
explicit VeloxRuntime(
const std::string& kind,
VeloxMemoryManager* vmm,
const std::unordered_map<std::string, std::string>& confMap);
~VeloxRuntime() override;
void setSparkTaskInfo(SparkTaskInfo taskInfo) override {
static std::atomic<uint32_t> vtId{0};
taskInfo.vId = vtId++;
taskInfo_ = taskInfo;
}
void parsePlan(const uint8_t* data, int32_t size) override;
void parseSplitInfo(const uint8_t* data, int32_t size, int32_t splitIndex) override;
VeloxMemoryManager* memoryManager() override;
// FIXME This is not thread-safe?
std::shared_ptr<ResultIterator> createResultIterator(
const std::string& spillDir,
const std::vector<std::shared_ptr<ResultIterator>>& inputs = {}) override;
void noMoreSplits(ResultIterator* iter) override;
void requestBarrier(ResultIterator* iter) override;
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) override;
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) override;
std::shared_ptr<ColumnarBatch> select(std::shared_ptr<ColumnarBatch> batch, const std::vector<int32_t>& columnIndices)
override;
std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct ArrowSchema* cSchema) override;
#ifdef GLUTEN_ENABLE_ENHANCED_FEATURES
std::shared_ptr<IcebergWriter> createIcebergWriter(
RowTypePtr rowType,
int32_t format,
const std::string& outputDirectory,
facebook::velox::common::CompressionKind compressionKind,
int32_t partitionId,
int64_t taskId,
const std::string& operationId,
std::shared_ptr<const facebook::velox::connector::hive::iceberg::IcebergPartitionSpec> spec,
const gluten::IcebergNestedField& protoField,
const std::unordered_map<std::string, std::string>& sparkConfs);
#endif
std::shared_ptr<ShuffleWriter> createShuffleWriter(
int numPartitions,
const std::shared_ptr<PartitionWriter>& partitionWriter,
const std::shared_ptr<ShuffleWriterOptions>& options) override;
Metrics* getMetrics(ColumnarBatchIterator* rawIter, int64_t exportNanos) override {
auto iter = static_cast<WholeStageResultIterator*>(rawIter);
return iter->getMetrics(exportNanos);
}
std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) override;
std::unique_ptr<ColumnarBatchSerializer> createColumnarBatchSerializer(struct ArrowSchema* cSchema) override;
std::string planString(bool details, const std::unordered_map<std::string, std::string>& sessionConf) override;
void enableDumping() override;
std::shared_ptr<VeloxDataSource> createDataSource(const std::string& filePath, std::shared_ptr<arrow::Schema> schema);
std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
return veloxPlan_;
}
const std::shared_ptr<facebook::velox::config::ConfigBase>& veloxCfg() const {
return veloxCfg_;
}
bool debugModeEnabled() const {
return debugModeEnabled_;
}
folly::Executor* executor() const {
return executor_.get();
}
folly::Executor* spillExecutor() const {
return spillExecutor_.get();
}
folly::Executor* ioExecutor() const {
return ioExecutor_.get();
}
const VeloxConnectorIds& connectorIds() const {
return connectorIds_;
}
static void getInfoAndIds(
const std::unordered_map<facebook::velox::core::PlanNodeId, std::shared_ptr<SplitInfo>>& splitInfoMap,
const std::unordered_set<facebook::velox::core::PlanNodeId>& leafPlanNodeIds,
std::vector<std::shared_ptr<SplitInfo>>& scanInfos,
std::vector<facebook::velox::core::PlanNodeId>& scanIds,
std::vector<facebook::velox::core::PlanNodeId>& streamIds);
private:
void initializeExecutors();
void registerConnectors();
void unregisterConnectors();
std::shared_ptr<const facebook::velox::core::PlanNode> veloxPlan_;
std::shared_ptr<facebook::velox::config::ConfigBase> veloxCfg_;
bool debugModeEnabled_{false};
std::unique_ptr<folly::Executor> executor_;
std::unique_ptr<folly::Executor> spillExecutor_;
std::unique_ptr<folly::Executor> ioExecutor_;
VeloxConnectorIds connectorIds_;
std::unordered_map<int32_t, std::shared_ptr<VeloxColumnarBatch>> emptySchemaBatchLoopUp_;
};
} // namespace gluten