blob: f2916da34b1c55e306a58a254315c85c436da7f6 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <limits>
#include <memory>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <string>
#include <thread>
#include <utility>
#include <vector>
#include "arrow/c/abi.h"
#include "paimon/common/utils/threadsafe_queue.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/reader/prefetch_file_batch_reader.h"
#include "paimon/result.h"
#include "paimon/status.h"
#include "paimon/utils/read_ahead_cache.h"
#include "paimon/utils/roaring_bitmap32.h"
struct ArrowSchema;
namespace paimon {
class ReaderBuilder;
class FileSystem;
class Executor;
class Predicate;
class Metrics;
class PrefetchFileBatchReaderImpl : public PrefetchFileBatchReader {
public:
static Result<std::unique_ptr<PrefetchFileBatchReaderImpl>> Create(
const std::string& data_file_path, const ReaderBuilder* reader_builder,
const std::shared_ptr<FileSystem>& fs, uint32_t prefetch_max_parallel_num,
int32_t batch_size, uint32_t prefetch_batch_count, bool enable_adaptive_prefetch_strategy,
const std::shared_ptr<Executor>& executor, bool initialize_read_ranges,
PrefetchCacheMode prefetch_cache_mode, const CacheConfig& cache_config,
const std::shared_ptr<MemoryPool>& pool);
~PrefetchFileBatchReaderImpl() override;
Result<FileBatchReader::ReadBatch> NextBatch() override {
return Status::Invalid(
"paimon inner reader PrefetchFileBatchReader should use NextBatchWithBitmap");
}
Result<FileBatchReader::ReadBatchWithBitmap> NextBatchWithBitmap() override;
std::shared_ptr<Metrics> GetReaderMetrics() const override;
Result<std::unique_ptr<::ArrowSchema>> GetFileSchema() const override;
Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr<Predicate>& predicate,
const std::optional<RoaringBitmap32>& selection_bitmap) override;
Status SeekToRow(uint64_t row_number) override;
Result<uint64_t> GetPreviousBatchFirstRowNumber() const override;
Result<uint64_t> GetNumberOfRows() const override;
uint64_t GetNextRowToRead() const override;
void Close() override;
Status SetReadRanges(const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges) override;
Result<std::vector<std::pair<uint64_t, uint64_t>>> GenReadRanges(
bool* need_prefetch) const override {
assert(false);
return Status::NotImplemented("gen read ranges not implemented");
}
bool SupportPreciseBitmapSelection() const override {
return readers_[0]->SupportPreciseBitmapSelection();
}
Status RefreshReadRanges();
inline PrefetchFileBatchReader* GetFirstReader() const {
return readers_[0].get();
}
inline bool NeedPrefetch() const {
return need_prefetch_;
}
private:
struct PrefetchBatch {
std::pair<uint64_t, uint64_t> read_range;
BatchReader::ReadBatchWithBitmap batch;
uint64_t previous_batch_first_row_num;
};
PrefetchFileBatchReaderImpl(
const std::vector<std::shared_ptr<PrefetchFileBatchReader>>& readers, int32_t batch_size,
uint32_t prefetch_queue_capacity, bool enable_adaptive_prefetch_strategy,
const std::shared_ptr<Executor>& executor, const std::shared_ptr<ReadAheadCache>& cache,
PrefetchCacheMode cache_mode);
Status CleanUp();
void Workloop();
void SetReadStatus(const Status& status);
Status GetReadStatus() const;
Result<bool> IsEofRange(const std::pair<uint64_t, uint64_t>& read_range) const;
Status DoReadBatch(size_t reader_idx);
void ReadBatch(size_t reader_idx);
size_t GetEnabledReaderSize() const;
static std::vector<std::pair<uint64_t, uint64_t>> FilterReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_range,
const std::optional<RoaringBitmap32>& selection_bitmap);
static std::vector<std::vector<std::pair<uint64_t, uint64_t>>> DispatchReadRanges(
const std::vector<std::pair<uint64_t, uint64_t>>& read_ranges, size_t reader_count);
Result<std::pair<uint64_t, uint64_t>> EofRange() const;
std::optional<std::pair<uint64_t, uint64_t>> GetCurrentReadRange(size_t reader_idx) const;
Status EnsureReaderPosition(size_t reader_idx,
const std::pair<uint64_t, uint64_t>& read_range) const;
Status HandleReadResult(size_t reader_idx, const std::pair<uint64_t, uint64_t>& read_range,
FileBatchReader::ReadBatchWithBitmap&& read_batch_with_bitmap);
bool NeedInitCache() const;
private:
std::vector<std::shared_ptr<PrefetchFileBatchReader>> readers_;
// The meaning of readers_pos_ is: all data before this pos has been filtered out or effectively
// consumed, and the data after this pos may need to be read in the next round of reading.
std::vector<std::unique_ptr<std::atomic<uint64_t>>> readers_pos_;
std::vector<std::unique_ptr<std::atomic<uint64_t>>> seek_cnt_;
const int32_t batch_size_;
std::optional<RoaringBitmap32> selection_bitmap_;
std::shared_ptr<Predicate> predicate_;
std::deque<std::pair<uint64_t, uint64_t>> read_ranges_;
std::vector<std::vector<std::pair<uint64_t, uint64_t>>> read_ranges_in_group_;
std::vector<std::unique_ptr<ThreadsafeQueue<PrefetchBatch>>> prefetch_queues_;
std::vector<bool> reader_is_working_;
std::mutex working_mutex_;
std::condition_variable cv_;
std::shared_ptr<Executor> executor_;
std::shared_ptr<ReadAheadCache> cache_;
PrefetchCacheMode cache_mode_;
mutable std::shared_mutex rw_mutex_;
std::unique_ptr<std::thread> background_thread_;
Status read_status_;
std::atomic<bool> is_shutdown_ = false;
uint64_t previous_batch_first_row_num_ = std::numeric_limits<uint64_t>::max();
bool need_prefetch_ = false;
bool read_ranges_freshed_ = false;
const uint32_t prefetch_queue_capacity_;
const bool enable_adaptive_prefetch_strategy_;
int32_t parallel_num_;
};
} // namespace paimon