blob: c4da6da7b80ef631684425e9c708ead3d7ad3f9c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This API is EXPERIMENTAL.
#pragma once
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "arrow/dataset/dataset.h"
#include "arrow/dataset/expression.h"
#include "arrow/dataset/projector.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/io/interfaces.h"
#include "arrow/memory_pool.h"
#include "arrow/type_fwd.h"
#include "arrow/util/iterator.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"
namespace arrow {
using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
namespace dataset {
/// \defgroup dataset-scanning Scanning API
///
/// @{
constexpr int64_t kDefaultBatchSize = 1 << 20;
constexpr int32_t kDefaultBatchReadahead = 32;
constexpr int32_t kDefaultFragmentReadahead = 8;
/// Scan-specific options, which can be changed between scans of the same dataset.
struct ARROW_DS_EXPORT ScanOptions {
/// A row filter (which will be pushed down to partitioning/reading if supported).
Expression filter = literal(true);
/// A projection expression (which can add/remove/rename columns).
Expression projection;
/// Schema with which batches will be read from fragments. This is also known as the
/// "reader schema" it will be used (for example) in constructing CSV file readers to
/// identify column types for parsing. Usually only a subset of its fields (see
/// MaterializedFields) will be materialized during a scan.
std::shared_ptr<Schema> dataset_schema;
/// Schema of projected record batches. This is independent of dataset_schema as its
/// fields are derived from the projection. For example, let
///
/// dataset_schema = {"a": int32, "b": int32, "id": utf8}
/// projection = project({equal(field_ref("a"), field_ref("b"))}, {"a_plus_b"})
///
/// (no filter specified). In this case, the projected_schema would be
///
/// {"a_plus_b": int32}
std::shared_ptr<Schema> projected_schema;
/// Maximum row count for scanned batches.
int64_t batch_size = kDefaultBatchSize;
/// How many batches to read ahead within a file
///
/// Set to 0 to disable batch readahead
///
/// Note: May not be supported by all formats
/// Note: May not be supported by all scanners
/// Note: Will be ignored if use_threads is set to false
int32_t batch_readahead = kDefaultBatchReadahead;
/// How many files to read ahead
///
/// Set to 0 to disable fragment readahead
///
/// Note: May not be enforced by all scanners
/// Note: Will be ignored if use_threads is set to false
int32_t fragment_readahead = kDefaultFragmentReadahead;
/// A pool from which materialized and scanned arrays will be allocated.
MemoryPool* pool = arrow::default_memory_pool();
/// Executor on which to run any CPU tasks
///
/// Note: Will be ignored if use_threads is set to false
internal::Executor* cpu_executor = internal::GetCpuThreadPool();
/// IOContext for any IO tasks
///
/// Note: The IOContext executor will be ignored if use_threads is set to false
io::IOContext io_context;
/// If true the scanner will scan in parallel
///
/// Note: If true, this will use threads from both the cpu_executor and the
/// io_context.executor
/// Note: This must be true in order for any readahead to happen
bool use_threads = false;
/// If true then an asycnhronous implementation of the scanner will be used.
/// This implementation is newer and generally performs better. However, it
/// makes extensive use of threading and is still considered experimental
bool use_async = false;
/// Fragment-specific scan options.
std::shared_ptr<FragmentScanOptions> fragment_scan_options;
// Return a vector of fields that requires materialization.
//
// This is usually the union of the fields referenced in the projection and the
// filter expression. Examples:
//
// - `SELECT a, b WHERE a < 2 && c > 1` => ["a", "b", "a", "c"]
// - `SELECT a + b < 3 WHERE a > 1` => ["a", "b"]
//
// This is needed for expression where a field may not be directly
// used in the final projection but is still required to evaluate the
// expression.
//
// This is used by Fragment implementations to apply the column
// sub-selection optimization.
std::vector<std::string> MaterializedFields() const;
// Return a threaded or serial TaskGroup according to use_threads.
std::shared_ptr<internal::TaskGroup> TaskGroup() const;
};
/// \brief Read record batches from a range of a single data fragment. A
/// ScanTask is meant to be a unit of work to be dispatched. The implementation
/// must be thread and concurrent safe.
class ARROW_DS_EXPORT ScanTask {
public:
/// \brief Iterate through sequence of materialized record batches
/// resulting from the Scan. Execution semantics are encapsulated in the
/// particular ScanTask implementation
virtual Result<RecordBatchIterator> Execute() = 0;
virtual ~ScanTask() = default;
const std::shared_ptr<ScanOptions>& options() const { return options_; }
const std::shared_ptr<Fragment>& fragment() const { return fragment_; }
protected:
ScanTask(std::shared_ptr<ScanOptions> options, std::shared_ptr<Fragment> fragment)
: options_(std::move(options)), fragment_(std::move(fragment)) {}
std::shared_ptr<ScanOptions> options_;
std::shared_ptr<Fragment> fragment_;
};
template <typename T>
struct Enumerated {
T value;
int index;
bool last;
};
/// \brief Combines a record batch with the fragment that the record batch originated
/// from
///
/// Knowing the source fragment can be useful for debugging & understanding loaded data
struct TaggedRecordBatch {
std::shared_ptr<RecordBatch> record_batch;
std::shared_ptr<Fragment> fragment;
};
using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
/// \brief Combines a tagged batch with positional information
///
/// This is returned when scanning batches in an unordered fashion. This information is
/// needed if you ever want to reassemble the batches in order
struct EnumeratedRecordBatch {
Enumerated<std::shared_ptr<RecordBatch>> record_batch;
Enumerated<std::shared_ptr<Fragment>> fragment;
};
using EnumeratedRecordBatchGenerator = std::function<Future<EnumeratedRecordBatch>()>;
using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
/// @}
} // namespace dataset
template <>
struct IterationTraits<dataset::TaggedRecordBatch> {
static dataset::TaggedRecordBatch End() {
return dataset::TaggedRecordBatch{NULL, NULL};
}
static bool IsEnd(const dataset::TaggedRecordBatch& val) {
return val.record_batch == NULL;
}
};
template <>
struct IterationTraits<dataset::EnumeratedRecordBatch> {
static dataset::EnumeratedRecordBatch End() {
return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, false}};
}
static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
return val.fragment.value == NULL;
}
};
namespace dataset {
/// \defgroup dataset-scanning Scanning API
///
/// @{
/// \brief A scanner glues together several dataset classes to load in data.
/// The dataset contains a collection of fragments and partitioning rules.
///
/// The fragments identify independently loadable units of data (i.e. each fragment has
/// a potentially unique schema and possibly even format. It should be possible to read
/// fragments in parallel if desired).
///
/// The fragment's format contains the logic necessary to actually create a task to load
/// the fragment into memory. That task may or may not support parallel execution of
/// its own.
///
/// The scanner is then responsible for creating scan tasks from every fragment in the
/// dataset and (potentially) sequencing the loaded record batches together.
///
/// The scanner should not buffer the entire dataset in memory (unless asked) instead
/// yielding record batches as soon as they are ready to scan. Various readahead
/// properties control how much data is allowed to be scanned before pausing to let a
/// slow consumer catchup.
///
/// Today the scanner also handles projection & filtering although that may change in
/// the future.
class ARROW_DS_EXPORT Scanner {
public:
virtual ~Scanner() = default;
/// \brief The Scan operator returns a stream of ScanTask. The caller is
/// responsible to dispatch/schedule said tasks. Tasks should be safe to run
/// in a concurrent fashion and outlive the iterator.
///
/// Note: Not supported by the async scanner
/// TODO(ARROW-11797) Deprecate Scan()
virtual Result<ScanTaskIterator> Scan();
/// \brief Convert a Scanner into a Table.
///
/// Use this convenience utility with care. This will serially materialize the
/// Scan result in memory before creating the Table.
virtual Result<std::shared_ptr<Table>> ToTable() = 0;
/// \brief Scan the dataset into a stream of record batches. Each batch is tagged
/// with the fragment it originated from. The batches will arrive in order. The
/// order of fragments is determined by the dataset.
///
/// Note: The scanner will perform some readahead but will avoid materializing too
/// much in memory (this is goverended by the readahead options and use_threads option).
/// If the readahead queue fills up then I/O will pause until the calling thread catches
/// up.
virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
/// \brief Scan the dataset into a stream of record batches. Unlike ScanBatches this
/// method may allow record batches to be returned out of order. This allows for more
/// efficient scanning: some fragments may be accessed more quickly than others (e.g.
/// may be cached in RAM or just happen to get scheduled earlier by the I/O)
///
/// To make up for the out-of-order iteration each batch is further tagged with
/// positional information.
virtual Result<EnumeratedRecordBatchIterator> ScanBatchesUnordered();
/// \brief Get the options for this scan.
const std::shared_ptr<ScanOptions>& options() const { return scan_options_; }
protected:
explicit Scanner(std::shared_ptr<ScanOptions> scan_options)
: scan_options_(std::move(scan_options)) {}
Result<EnumeratedRecordBatchIterator> AddPositioningToInOrderScan(
TaggedRecordBatchIterator scan);
const std::shared_ptr<ScanOptions> scan_options_;
};
class ARROW_DS_EXPORT SyncScanner : public Scanner {
public:
SyncScanner(std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> scan_options)
: Scanner(std::move(scan_options)), dataset_(std::move(dataset)) {}
SyncScanner(std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options)
: Scanner(std::move(scan_options)), fragment_(std::move(fragment)) {}
Result<TaggedRecordBatchIterator> ScanBatches() override;
Result<ScanTaskIterator> Scan() override;
Result<std::shared_ptr<Table>> ToTable() override;
protected:
/// \brief GetFragments returns an iterator over all Fragments in this scan.
Result<FragmentIterator> GetFragments();
Future<std::shared_ptr<Table>> ToTableInternal(internal::Executor* cpu_executor);
std::shared_ptr<Dataset> dataset_;
// TODO(ARROW-8065) remove fragment_ after a Dataset is constuctible from fragments
std::shared_ptr<Fragment> fragment_;
};
/// \brief ScannerBuilder is a factory class to construct a Scanner. It is used
/// to pass information, notably a potential filter expression and a subset of
/// columns to materialize.
class ARROW_DS_EXPORT ScannerBuilder {
public:
explicit ScannerBuilder(std::shared_ptr<Dataset> dataset);
ScannerBuilder(std::shared_ptr<Dataset> dataset,
std::shared_ptr<ScanOptions> scan_options);
ScannerBuilder(std::shared_ptr<Schema> schema, std::shared_ptr<Fragment> fragment,
std::shared_ptr<ScanOptions> scan_options);
/// \brief Set the subset of columns to materialize.
///
/// Columns which are not referenced may not be read from fragments.
///
/// \param[in] columns list of columns to project. Order and duplicates will
/// be preserved.
///
/// \return Failure if any column name does not exists in the dataset's
/// Schema.
Status Project(std::vector<std::string> columns);
/// \brief Set expressions which will be evaluated to produce the materialized
/// columns.
///
/// Columns which are not referenced may not be read from fragments.
///
/// \param[in] exprs expressions to evaluate to produce columns.
/// \param[in] names list of names for the resulting columns.
///
/// \return Failure if any referenced column does not exists in the dataset's
/// Schema.
Status Project(std::vector<Expression> exprs, std::vector<std::string> names);
/// \brief Set the filter expression to return only rows matching the filter.
///
/// The predicate will be passed down to Sources and corresponding
/// Fragments to exploit predicate pushdown if possible using
/// partition information or Fragment internal metadata, e.g. Parquet statistics.
/// Columns which are not referenced may not be read from fragments.
///
/// \param[in] filter expression to filter rows with.
///
/// \return Failure if any referenced columns does not exist in the dataset's
/// Schema.
Status Filter(const Expression& filter);
/// \brief Indicate if the Scanner should make use of the available
/// ThreadPool found in ScanOptions;
Status UseThreads(bool use_threads = true);
/// \brief Set the maximum number of rows per RecordBatch.
///
/// \param[in] batch_size the maximum number of rows.
/// \returns An error if the number for batch is not greater than 0.
///
/// This option provides a control limiting the memory owned by any RecordBatch.
Status BatchSize(int64_t batch_size);
/// \brief Set the pool from which materialized and scanned arrays will be allocated.
Status Pool(MemoryPool* pool);
/// \brief Set fragment-specific scan options.
Status FragmentScanOptions(std::shared_ptr<FragmentScanOptions> fragment_scan_options);
/// \brief Return the constructed now-immutable Scanner object
Result<std::shared_ptr<Scanner>> Finish();
const std::shared_ptr<Schema>& schema() const;
const std::shared_ptr<Schema>& projected_schema() const;
private:
std::shared_ptr<Dataset> dataset_;
std::shared_ptr<Fragment> fragment_;
std::shared_ptr<ScanOptions> scan_options_;
};
/// @}
/// \brief A trivial ScanTask that yields the RecordBatch of an array.
class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask {
public:
InMemoryScanTask(std::vector<std::shared_ptr<RecordBatch>> record_batches,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<Fragment> fragment)
: ScanTask(std::move(options), std::move(fragment)),
record_batches_(std::move(record_batches)) {}
Result<RecordBatchIterator> Execute() override;
protected:
std::vector<std::shared_ptr<RecordBatch>> record_batches_;
};
ARROW_DS_EXPORT Result<ScanTaskIterator> ScanTaskIteratorFromRecordBatch(
std::vector<std::shared_ptr<RecordBatch>> batches,
std::shared_ptr<ScanOptions> options);
} // namespace dataset
} // namespace arrow