| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "gandiva/projector.h" |
| |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "gandiva/cache.h" |
| #include "gandiva/expr_validator.h" |
| #include "gandiva/llvm_generator.h" |
| #include "gandiva/projector_cache_key.h" |
| |
| namespace gandiva { |
| |
| Projector::Projector(std::unique_ptr<LLVMGenerator> llvm_generator, SchemaPtr schema, |
| const FieldVector& output_fields, |
| std::shared_ptr<Configuration> configuration) |
| : llvm_generator_(std::move(llvm_generator)), |
| schema_(schema), |
| output_fields_(output_fields), |
| configuration_(configuration) {} |
| |
| Projector::~Projector() {} |
| |
| Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, |
| std::shared_ptr<Projector>* projector) { |
| return Projector::Make(schema, exprs, SelectionVector::Mode::MODE_NONE, |
| ConfigurationBuilder::DefaultConfiguration(), projector); |
| } |
| |
| Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, |
| std::shared_ptr<Configuration> configuration, |
| std::shared_ptr<Projector>* projector) { |
| return Projector::Make(schema, exprs, SelectionVector::Mode::MODE_NONE, configuration, |
| projector); |
| } |
| |
| Status Projector::Make(SchemaPtr schema, const ExpressionVector& exprs, |
| SelectionVector::Mode selection_vector_mode, |
| std::shared_ptr<Configuration> configuration, |
| std::shared_ptr<Projector>* projector) { |
| ARROW_RETURN_IF(schema == nullptr, Status::Invalid("Schema cannot be null")); |
| ARROW_RETURN_IF(exprs.empty(), Status::Invalid("Expressions cannot be empty")); |
| ARROW_RETURN_IF(configuration == nullptr, |
| Status::Invalid("Configuration cannot be null")); |
| |
| // see if equivalent projector was already built |
| static Cache<ProjectorCacheKey, std::shared_ptr<Projector>> cache; |
| ProjectorCacheKey cache_key(schema, configuration, exprs, selection_vector_mode); |
| std::shared_ptr<Projector> cached_projector = cache.GetModule(cache_key); |
| if (cached_projector != nullptr) { |
| *projector = cached_projector; |
| return Status::OK(); |
| } |
| |
| // Build LLVM generator, and generate code for the specified expressions |
| std::unique_ptr<LLVMGenerator> llvm_gen; |
| ARROW_RETURN_NOT_OK(LLVMGenerator::Make(configuration, &llvm_gen)); |
| |
| // Run the validation on the expressions. |
| // Return if any of the expression is invalid since |
| // we will not be able to process further. |
| ExprValidator expr_validator(llvm_gen->types(), schema); |
| for (auto& expr : exprs) { |
| ARROW_RETURN_NOT_OK(expr_validator.Validate(expr)); |
| } |
| |
| ARROW_RETURN_NOT_OK(llvm_gen->Build(exprs, selection_vector_mode)); |
| |
| // save the output field types. Used for validation at Evaluate() time. |
| std::vector<FieldPtr> output_fields; |
| output_fields.reserve(exprs.size()); |
| for (auto& expr : exprs) { |
| output_fields.push_back(expr->result()); |
| } |
| |
| // Instantiate the projector with the completely built llvm generator |
| *projector = std::shared_ptr<Projector>( |
| new Projector(std::move(llvm_gen), schema, output_fields, configuration)); |
| cache.PutModule(cache_key, *projector); |
| |
| return Status::OK(); |
| } |
| |
| Status Projector::Evaluate(const arrow::RecordBatch& batch, |
| const ArrayDataVector& output_data_vecs) { |
| return Evaluate(batch, nullptr, output_data_vecs); |
| } |
| |
| Status Projector::Evaluate(const arrow::RecordBatch& batch, |
| const SelectionVector* selection_vector, |
| const ArrayDataVector& output_data_vecs) { |
| ARROW_RETURN_NOT_OK(ValidateEvaluateArgsCommon(batch)); |
| |
| if (output_data_vecs.size() != output_fields_.size()) { |
| std::stringstream ss; |
| ss << "number of buffers for output_data_vecs is " << output_data_vecs.size() |
| << ", expected " << output_fields_.size(); |
| return Status::Invalid(ss.str()); |
| } |
| |
| int idx = 0; |
| for (auto& array_data : output_data_vecs) { |
| if (array_data == nullptr) { |
| std::stringstream ss; |
| ss << "array for output field " << output_fields_[idx]->name() << "is null."; |
| return Status::Invalid(ss.str()); |
| } |
| |
| auto num_rows = |
| selection_vector == nullptr ? batch.num_rows() : selection_vector->GetNumSlots(); |
| |
| ARROW_RETURN_NOT_OK( |
| ValidateArrayDataCapacity(*array_data, *(output_fields_[idx]), num_rows)); |
| ++idx; |
| } |
| return llvm_generator_->Execute(batch, selection_vector, output_data_vecs); |
| } |
| |
| Status Projector::Evaluate(const arrow::RecordBatch& batch, arrow::MemoryPool* pool, |
| arrow::ArrayVector* output) { |
| return Evaluate(batch, nullptr, pool, output); |
| } |
| |
| Status Projector::Evaluate(const arrow::RecordBatch& batch, |
| const SelectionVector* selection_vector, |
| arrow::MemoryPool* pool, arrow::ArrayVector* output) { |
| ARROW_RETURN_NOT_OK(ValidateEvaluateArgsCommon(batch)); |
| ARROW_RETURN_IF(output == nullptr, Status::Invalid("Output must be non-null.")); |
| ARROW_RETURN_IF(pool == nullptr, Status::Invalid("Memory pool must be non-null.")); |
| |
| auto num_rows = |
| selection_vector == nullptr ? batch.num_rows() : selection_vector->GetNumSlots(); |
| // Allocate the output data vecs. |
| ArrayDataVector output_data_vecs; |
| for (auto& field : output_fields_) { |
| ArrayDataPtr output_data; |
| |
| ARROW_RETURN_NOT_OK(AllocArrayData(field->type(), num_rows, pool, &output_data)); |
| output_data_vecs.push_back(output_data); |
| } |
| |
| // Execute the expression(s). |
| ARROW_RETURN_NOT_OK( |
| llvm_generator_->Execute(batch, selection_vector, output_data_vecs)); |
| |
| // Create and return array arrays. |
| output->clear(); |
| for (auto& array_data : output_data_vecs) { |
| output->push_back(arrow::MakeArray(array_data)); |
| } |
| return Status::OK(); |
| } |
| |
| // TODO : handle variable-len vectors |
| Status Projector::AllocArrayData(const DataTypePtr& type, int64_t num_records, |
| arrow::MemoryPool* pool, ArrayDataPtr* array_data) { |
| const auto* fw_type = dynamic_cast<const arrow::FixedWidthType*>(type.get()); |
| ARROW_RETURN_IF(fw_type == nullptr, |
| Status::Invalid("Unsupported output data type ", type)); |
| |
| std::shared_ptr<arrow::Buffer> null_bitmap; |
| int64_t bitmap_bytes = arrow::BitUtil::BytesForBits(num_records); |
| ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(pool, bitmap_bytes, &null_bitmap)); |
| |
| std::shared_ptr<arrow::Buffer> data; |
| int64_t data_len = arrow::BitUtil::BytesForBits(num_records * fw_type->bit_width()); |
| ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(pool, data_len, &data)); |
| |
| // This is not strictly required but valgrind gets confused and detects this |
| // as uninitialized memory access. See arrow::util::SetBitTo(). |
| if (type->id() == arrow::Type::BOOL) { |
| memset(data->mutable_data(), 0, data_len); |
| } |
| |
| *array_data = arrow::ArrayData::Make(type, num_records, {null_bitmap, data}); |
| return Status::OK(); |
| } |
| |
| Status Projector::ValidateEvaluateArgsCommon(const arrow::RecordBatch& batch) { |
| ARROW_RETURN_IF(!batch.schema()->Equals(*schema_), |
| Status::Invalid("Schema in RecordBatch must match schema in Make()")); |
| ARROW_RETURN_IF(batch.num_rows() == 0, |
| Status::Invalid("RecordBatch must be non-empty.")); |
| |
| return Status::OK(); |
| } |
| |
| Status Projector::ValidateArrayDataCapacity(const arrow::ArrayData& array_data, |
| const arrow::Field& field, |
| int64_t num_records) { |
| ARROW_RETURN_IF(array_data.buffers.size() < 2, |
| Status::Invalid("ArrayData must have at least 2 buffers")); |
| |
| int64_t min_bitmap_len = arrow::BitUtil::BytesForBits(num_records); |
| int64_t bitmap_len = array_data.buffers[0]->capacity(); |
| ARROW_RETURN_IF(bitmap_len < min_bitmap_len, |
| Status::Invalid("Bitmap buffer too small for ", field.name())); |
| |
| // verify size of data buffer. |
| // TODO : handle variable-len vectors |
| const auto& fw_type = dynamic_cast<const arrow::FixedWidthType&>(*field.type()); |
| int64_t min_data_len = arrow::BitUtil::BytesForBits(num_records * fw_type.bit_width()); |
| int64_t data_len = array_data.buffers[1]->capacity(); |
| ARROW_RETURN_IF(data_len < min_data_len, |
| Status::Invalid("Data buffer too small for ", field.name())); |
| |
| return Status::OK(); |
| } |
| |
| } // namespace gandiva |