| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #pragma once |
| |
| #include <chrono> |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| #include <unordered_map> |
| #include <vector> |
| |
| // Forward declare Arrow classes to avoid including heavy Arrow headers in header |
| namespace arrow { |
| class RecordBatch; |
| } |
| |
| namespace fluss { |
| |
| namespace ffi { |
| struct Connection; |
| struct Admin; |
| struct Table; |
| struct AppendWriter; |
| struct WriteResult; |
| struct LogScanner; |
| } // namespace ffi |
| |
| struct Date { |
| int32_t days_since_epoch{0}; |
| |
| static Date FromDays(int32_t days) { return {days}; } |
| static Date FromYMD(int year, int month, int day); |
| |
| int Year() const; |
| int Month() const; |
| int Day() const; |
| }; |
| |
| struct Time { |
| static constexpr int32_t kMillisPerSecond = 1000; |
| static constexpr int32_t kMillisPerMinute = 60 * kMillisPerSecond; |
| static constexpr int32_t kMillisPerHour = 60 * kMillisPerMinute; |
| |
| int32_t millis_since_midnight{0}; |
| |
| static Time FromMillis(int32_t ms) { return {ms}; } |
| static Time FromHMS(int hour, int minute, int second, int millis = 0) { |
| return {hour * kMillisPerHour + minute * kMillisPerMinute + second * kMillisPerSecond + |
| millis}; |
| } |
| |
| int Hour() const { return millis_since_midnight / kMillisPerHour; } |
| int Minute() const { return (millis_since_midnight % kMillisPerHour) / kMillisPerMinute; } |
| int Second() const { return (millis_since_midnight % kMillisPerMinute) / kMillisPerSecond; } |
| int Millis() const { return millis_since_midnight % kMillisPerSecond; } |
| }; |
| |
| struct Timestamp { |
| static constexpr int32_t kMaxNanoOfMillisecond = 999999; |
| static constexpr int64_t kNanosPerMilli = 1000000; |
| |
| int64_t epoch_millis{0}; |
| int32_t nano_of_millisecond{0}; |
| |
| static Timestamp FromMillis(int64_t ms) { return {ms, 0}; } |
| static Timestamp FromMillisNanos(int64_t ms, int32_t nanos) { |
| if (nanos < 0) nanos = 0; |
| if (nanos > kMaxNanoOfMillisecond) nanos = kMaxNanoOfMillisecond; |
| return {ms, nanos}; |
| } |
| static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) { |
| auto duration = tp.time_since_epoch(); |
| auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count(); |
| auto ms = ns / kNanosPerMilli; |
| auto nano_of_ms = static_cast<int32_t>(ns % kNanosPerMilli); |
| if (nano_of_ms < 0) { |
| nano_of_ms += kNanosPerMilli; |
| ms -= 1; |
| } |
| return {ms, nano_of_ms}; |
| } |
| }; |
| |
| enum class TypeId { |
| Boolean = 1, |
| TinyInt = 2, |
| SmallInt = 3, |
| Int = 4, |
| BigInt = 5, |
| Float = 6, |
| Double = 7, |
| String = 8, |
| Bytes = 9, |
| Date = 10, |
| Time = 11, |
| Timestamp = 12, |
| TimestampLtz = 13, |
| Decimal = 14, |
| }; |
| |
| class DataType { |
| public: |
| explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0) |
| : id_(id), precision_(p), scale_(s) {} |
| |
| static DataType Boolean() { return DataType(TypeId::Boolean); } |
| static DataType TinyInt() { return DataType(TypeId::TinyInt); } |
| static DataType SmallInt() { return DataType(TypeId::SmallInt); } |
| static DataType Int() { return DataType(TypeId::Int); } |
| static DataType BigInt() { return DataType(TypeId::BigInt); } |
| static DataType Float() { return DataType(TypeId::Float); } |
| static DataType Double() { return DataType(TypeId::Double); } |
| static DataType String() { return DataType(TypeId::String); } |
| static DataType Bytes() { return DataType(TypeId::Bytes); } |
| static DataType Date() { return DataType(TypeId::Date); } |
| static DataType Time() { return DataType(TypeId::Time); } |
| static DataType Timestamp(int32_t precision = 6) { |
| return DataType(TypeId::Timestamp, precision, 0); |
| } |
| static DataType TimestampLtz(int32_t precision = 6) { |
| return DataType(TypeId::TimestampLtz, precision, 0); |
| } |
| static DataType Decimal(int32_t precision, int32_t scale) { |
| return DataType(TypeId::Decimal, precision, scale); |
| } |
| |
| TypeId id() const { return id_; } |
| int32_t precision() const { return precision_; } |
| int32_t scale() const { return scale_; } |
| |
| private: |
| TypeId id_; |
| int32_t precision_{0}; |
| int32_t scale_{0}; |
| }; |
| |
| enum class DatumType { |
| Null = 0, |
| Bool = 1, |
| Int32 = 2, |
| Int64 = 3, |
| Float32 = 4, |
| Float64 = 5, |
| String = 6, |
| Bytes = 7, |
| DecimalI64 = 8, |
| DecimalI128 = 9, |
| DecimalString = 10, |
| Date = 11, |
| Time = 12, |
| TimestampNtz = 13, |
| TimestampLtz = 14, |
| }; |
| |
| constexpr int64_t EARLIEST_OFFSET = -2; |
| constexpr int64_t LATEST_OFFSET = -1; |
| |
| enum class OffsetSpec { |
| Earliest = 0, |
| Latest = 1, |
| Timestamp = 2, |
| }; |
| |
| struct OffsetQuery { |
| OffsetSpec spec; |
| int64_t timestamp{0}; |
| |
| static OffsetQuery Earliest() { return {OffsetSpec::Earliest, 0}; } |
| static OffsetQuery Latest() { return {OffsetSpec::Latest, 0}; } |
| static OffsetQuery FromTimestamp(int64_t ts) { return {OffsetSpec::Timestamp, ts}; } |
| }; |
| |
| struct Result { |
| int32_t error_code{0}; |
| std::string error_message; |
| |
| bool Ok() const { return error_code == 0; } |
| }; |
| |
| struct TablePath { |
| std::string database_name; |
| std::string table_name; |
| |
| TablePath() = default; |
| TablePath(std::string db, std::string tbl) |
| : database_name(std::move(db)), table_name(std::move(tbl)) {} |
| |
| std::string ToString() const { return database_name + "." + table_name; } |
| }; |
| |
| struct Column { |
| std::string name; |
| DataType data_type; |
| std::string comment; |
| }; |
| |
| struct Schema { |
| std::vector<Column> columns; |
| std::vector<std::string> primary_keys; |
| |
| class Builder { |
| public: |
| Builder& AddColumn(std::string name, DataType type, std::string comment = "") { |
| columns_.push_back({std::move(name), std::move(type), std::move(comment)}); |
| return *this; |
| } |
| |
| Builder& SetPrimaryKeys(std::vector<std::string> keys) { |
| primary_keys_ = std::move(keys); |
| return *this; |
| } |
| |
| Schema Build() { return Schema{std::move(columns_), std::move(primary_keys_)}; } |
| |
| private: |
| std::vector<Column> columns_; |
| std::vector<std::string> primary_keys_; |
| }; |
| |
| static Builder NewBuilder() { return Builder(); } |
| }; |
| |
| struct TableDescriptor { |
| Schema schema; |
| std::vector<std::string> partition_keys; |
| int32_t bucket_count{0}; |
| std::vector<std::string> bucket_keys; |
| std::unordered_map<std::string, std::string> properties; |
| std::string comment; |
| |
| class Builder { |
| public: |
| Builder& SetSchema(Schema s) { |
| schema_ = std::move(s); |
| return *this; |
| } |
| |
| Builder& SetPartitionKeys(std::vector<std::string> keys) { |
| partition_keys_ = std::move(keys); |
| return *this; |
| } |
| |
| Builder& SetBucketCount(int32_t count) { |
| bucket_count_ = count; |
| return *this; |
| } |
| |
| Builder& SetBucketKeys(std::vector<std::string> keys) { |
| bucket_keys_ = std::move(keys); |
| return *this; |
| } |
| |
| Builder& SetProperty(std::string key, std::string value) { |
| properties_[std::move(key)] = std::move(value); |
| return *this; |
| } |
| |
| Builder& SetComment(std::string comment) { |
| comment_ = std::move(comment); |
| return *this; |
| } |
| |
| TableDescriptor Build() { |
| return TableDescriptor{std::move(schema_), std::move(partition_keys_), |
| bucket_count_, std::move(bucket_keys_), |
| std::move(properties_), std::move(comment_)}; |
| } |
| |
| private: |
| Schema schema_; |
| std::vector<std::string> partition_keys_; |
| int32_t bucket_count_{0}; |
| std::vector<std::string> bucket_keys_; |
| std::unordered_map<std::string, std::string> properties_; |
| std::string comment_; |
| }; |
| |
| static Builder NewBuilder() { return Builder(); } |
| }; |
| |
| struct TableInfo { |
| int64_t table_id; |
| int32_t schema_id; |
| TablePath table_path; |
| int64_t created_time; |
| int64_t modified_time; |
| std::vector<std::string> primary_keys; |
| std::vector<std::string> bucket_keys; |
| std::vector<std::string> partition_keys; |
| int32_t num_buckets; |
| bool has_primary_key; |
| bool is_partitioned; |
| std::unordered_map<std::string, std::string> properties; |
| std::string comment; |
| Schema schema; |
| }; |
| |
| struct Datum { |
| DatumType type{DatumType::Null}; |
| bool bool_val{false}; |
| int32_t i32_val{0}; |
| int64_t i64_val{0}; |
| float f32_val{0.0F}; |
| double f64_val{0.0}; |
| std::string string_val; |
| std::vector<uint8_t> bytes_val; |
| int32_t decimal_precision{0}; // Decimal: precision (total digits) |
| int32_t decimal_scale{0}; // Decimal: scale (digits after decimal point) |
| int64_t i128_hi{0}; // Decimal (i128): high 64 bits of unscaled value |
| int64_t i128_lo{0}; // Decimal (i128): low 64 bits of unscaled value |
| |
| static Datum Null() { return {}; } |
| static Datum Bool(bool v) { |
| Datum d; |
| d.type = DatumType::Bool; |
| d.bool_val = v; |
| return d; |
| } |
| static Datum Int32(int32_t v) { |
| Datum d; |
| d.type = DatumType::Int32; |
| d.i32_val = v; |
| return d; |
| } |
| static Datum Int64(int64_t v) { |
| Datum d; |
| d.type = DatumType::Int64; |
| d.i64_val = v; |
| return d; |
| } |
| static Datum Float32(float v) { |
| Datum d; |
| d.type = DatumType::Float32; |
| d.f32_val = v; |
| return d; |
| } |
| static Datum Float64(double v) { |
| Datum d; |
| d.type = DatumType::Float64; |
| d.f64_val = v; |
| return d; |
| } |
| static Datum String(std::string v) { |
| Datum d; |
| d.type = DatumType::String; |
| d.string_val = std::move(v); |
| return d; |
| } |
| static Datum Bytes(std::vector<uint8_t> v) { |
| Datum d; |
| d.type = DatumType::Bytes; |
| d.bytes_val = std::move(v); |
| return d; |
| } |
| static Datum Date(fluss::Date d) { |
| Datum dat; |
| dat.type = DatumType::Date; |
| dat.i32_val = d.days_since_epoch; |
| return dat; |
| } |
| static Datum Time(fluss::Time t) { |
| Datum dat; |
| dat.type = DatumType::Time; |
| dat.i32_val = t.millis_since_midnight; |
| return dat; |
| } |
| static Datum TimestampNtz(fluss::Timestamp ts) { |
| Datum dat; |
| dat.type = DatumType::TimestampNtz; |
| dat.i64_val = ts.epoch_millis; |
| dat.i32_val = ts.nano_of_millisecond; |
| return dat; |
| } |
| static Datum TimestampLtz(fluss::Timestamp ts) { |
| Datum dat; |
| dat.type = DatumType::TimestampLtz; |
| dat.i64_val = ts.epoch_millis; |
| dat.i32_val = ts.nano_of_millisecond; |
| return dat; |
| } |
| // Stores the decimal string as-is. Rust side will parse via BigDecimal, |
| // look up (p,s) from the schema, validate, and create the Decimal. |
| static Datum DecimalString(std::string str) { |
| Datum d; |
| d.type = DatumType::DecimalString; |
| d.string_val = std::move(str); |
| return d; |
| } |
| |
| fluss::Date GetDate() const { return {i32_val}; } |
| fluss::Time GetTime() const { return {i32_val}; } |
| fluss::Timestamp GetTimestamp() const { return {i64_val, i32_val}; } |
| |
| bool IsDecimal() const { |
| return type == DatumType::DecimalI64 || type == DatumType::DecimalI128 || |
| type == DatumType::DecimalString; |
| } |
| |
| std::string DecimalToString() const { |
| if (type == DatumType::DecimalI64) { |
| return FormatUnscaled64(i64_val, decimal_scale); |
| } else if (type == DatumType::DecimalI128) { |
| unsigned __int128 uval = |
| (static_cast<unsigned __int128>(static_cast<uint64_t>(i128_hi)) << 64) | |
| static_cast<unsigned __int128>(static_cast<uint64_t>(i128_lo)); |
| __int128 val = static_cast<__int128>(uval); |
| return FormatUnscaled128(val, decimal_scale); |
| } else if (type == DatumType::DecimalString) { |
| return string_val; |
| } |
| return ""; |
| } |
| |
| private: |
| static std::string FormatUnscaled64(int64_t unscaled, int32_t scale) { |
| bool negative = unscaled < 0; |
| uint64_t abs_val = |
| negative ? -static_cast<uint64_t>(unscaled) : static_cast<uint64_t>(unscaled); |
| std::string digits = std::to_string(abs_val); |
| if (scale <= 0) { |
| return (negative ? "-" : "") + digits; |
| } |
| while (static_cast<int32_t>(digits.size()) <= scale) { |
| digits = "0" + digits; |
| } |
| auto pos = digits.size() - static_cast<size_t>(scale); |
| return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos); |
| } |
| |
| static std::string FormatUnscaled128(__int128 val, int32_t scale) { |
| bool negative = val < 0; |
| unsigned __int128 abs_val = |
| negative ? -static_cast<unsigned __int128>(val) : static_cast<unsigned __int128>(val); |
| std::string digits; |
| if (abs_val == 0) { |
| digits = "0"; |
| } else { |
| while (abs_val > 0) { |
| digits = static_cast<char>('0' + static_cast<int>(abs_val % 10)) + digits; |
| abs_val /= 10; |
| } |
| } |
| if (scale <= 0) { |
| return (negative ? "-" : "") + digits; |
| } |
| while (static_cast<int32_t>(digits.size()) <= scale) { |
| digits = "0" + digits; |
| } |
| auto pos = digits.size() - static_cast<size_t>(scale); |
| return (negative ? "-" : "") + digits.substr(0, pos) + "." + digits.substr(pos); |
| } |
| }; |
| |
| struct GenericRow { |
| std::vector<Datum> fields; |
| |
| void SetNull(size_t idx) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Null(); |
| } |
| |
| void SetBool(size_t idx, bool v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Bool(v); |
| } |
| |
| void SetInt32(size_t idx, int32_t v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Int32(v); |
| } |
| |
| void SetInt64(size_t idx, int64_t v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Int64(v); |
| } |
| |
| void SetFloat32(size_t idx, float v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Float32(v); |
| } |
| |
| void SetFloat64(size_t idx, double v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Float64(v); |
| } |
| |
| void SetString(size_t idx, std::string v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::String(std::move(v)); |
| } |
| |
| void SetBytes(size_t idx, std::vector<uint8_t> v) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Bytes(std::move(v)); |
| } |
| |
| void SetDate(size_t idx, fluss::Date d) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Date(d); |
| } |
| |
| void SetTime(size_t idx, fluss::Time t) { |
| EnsureSize(idx); |
| fields[idx] = Datum::Time(t); |
| } |
| |
| void SetTimestampNtz(size_t idx, fluss::Timestamp ts) { |
| EnsureSize(idx); |
| fields[idx] = Datum::TimestampNtz(ts); |
| } |
| |
| void SetTimestampLtz(size_t idx, fluss::Timestamp ts) { |
| EnsureSize(idx); |
| fields[idx] = Datum::TimestampLtz(ts); |
| } |
| |
| void SetDecimal(size_t idx, const std::string& value) { |
| EnsureSize(idx); |
| fields[idx] = Datum::DecimalString(value); |
| } |
| |
| private: |
| void EnsureSize(size_t idx) { |
| if (fields.size() <= idx) { |
| fields.resize(idx + 1); |
| } |
| } |
| }; |
| |
| struct ScanRecord { |
| int32_t bucket_id; |
| int64_t offset; |
| int64_t timestamp; |
| GenericRow row; |
| }; |
| |
| struct ScanRecords { |
| std::vector<ScanRecord> records; |
| |
| size_t Size() const { return records.size(); } |
| bool Empty() const { return records.empty(); } |
| const ScanRecord& operator[](size_t idx) const { return records[idx]; } |
| |
| auto begin() const { return records.begin(); } |
| auto end() const { return records.end(); } |
| }; |
| |
| class ArrowRecordBatch { |
| public: |
| std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch() const { return batch_; } |
| |
| bool Available() const; |
| |
| // Get number of rows in the batch |
| int64_t NumRows() const; |
| |
| // Get ScanBatch metadata |
| int64_t GetTableId() const; |
| int64_t GetPartitionId() const; |
| int32_t GetBucketId() const; |
| int64_t GetBaseOffset() const; |
| int64_t GetLastOffset() const; |
| |
| private: |
| friend class LogScanner; |
| explicit ArrowRecordBatch(std::shared_ptr<arrow::RecordBatch> batch, int64_t table_id, |
| int64_t partition_id, int32_t bucket_id, |
| int64_t base_offset) noexcept; |
| |
| std::shared_ptr<arrow::RecordBatch> batch_{nullptr}; |
| |
| int64_t table_id_; |
| int64_t partition_id_; |
| int32_t bucket_id_; |
| int64_t base_offset_; |
| }; |
| |
| struct ArrowRecordBatches { |
| std::vector<std::unique_ptr<ArrowRecordBatch>> batches; |
| |
| size_t Size() const { return batches.size(); } |
| bool Empty() const { return batches.empty(); } |
| const std::unique_ptr<ArrowRecordBatch>& operator[](size_t idx) const { return batches[idx]; } |
| |
| auto begin() const { return batches.begin(); } |
| auto end() const { return batches.end(); } |
| }; |
| |
| struct BucketOffset { |
| int64_t table_id; |
| int64_t partition_id; |
| int32_t bucket_id; |
| int64_t offset; |
| }; |
| |
| struct BucketSubscription { |
| int32_t bucket_id; |
| int64_t offset; |
| }; |
| |
| struct PartitionBucketSubscription { |
| int64_t partition_id; |
| int32_t bucket_id; |
| int64_t offset; |
| }; |
| |
| struct LakeSnapshot { |
| int64_t snapshot_id; |
| std::vector<BucketOffset> bucket_offsets; |
| }; |
| |
| struct PartitionInfo { |
| int64_t partition_id; |
| std::string partition_name; |
| }; |
| |
| class AppendWriter; |
| class WriteResult; |
| class LogScanner; |
| class Admin; |
| class Table; |
| class TableScan; |
| |
| class Connection { |
| public: |
| Connection() noexcept; |
| ~Connection() noexcept; |
| |
| Connection(const Connection&) = delete; |
| Connection& operator=(const Connection&) = delete; |
| Connection(Connection&& other) noexcept; |
| Connection& operator=(Connection&& other) noexcept; |
| |
| static Result Connect(const std::string& bootstrap_server, Connection& out); |
| |
| bool Available() const; |
| |
| Result GetAdmin(Admin& out); |
| Result GetTable(const TablePath& table_path, Table& out); |
| |
| private: |
| void Destroy() noexcept; |
| ffi::Connection* conn_{nullptr}; |
| }; |
| |
| class Admin { |
| public: |
| Admin() noexcept; |
| ~Admin() noexcept; |
| |
| Admin(const Admin&) = delete; |
| Admin& operator=(const Admin&) = delete; |
| Admin(Admin&& other) noexcept; |
| Admin& operator=(Admin&& other) noexcept; |
| |
| bool Available() const; |
| |
| Result CreateTable(const TablePath& table_path, const TableDescriptor& descriptor, |
| bool ignore_if_exists = false); |
| |
| Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = false); |
| |
| Result GetTable(const TablePath& table_path, TableInfo& out); |
| |
| Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out); |
| |
| Result ListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids, |
| const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out); |
| |
| Result ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name, |
| const std::vector<int32_t>& bucket_ids, |
| const OffsetQuery& offset_query, |
| std::unordered_map<int32_t, int64_t>& out); |
| |
| Result ListPartitionInfos(const TablePath& table_path, std::vector<PartitionInfo>& out); |
| |
| Result CreatePartition(const TablePath& table_path, |
| const std::unordered_map<std::string, std::string>& partition_spec, |
| bool ignore_if_exists = false); |
| |
| private: |
| Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids, |
| const OffsetQuery& offset_query, std::unordered_map<int32_t, int64_t>& out, |
| const std::string* partition_name = nullptr); |
| |
| friend class Connection; |
| Admin(ffi::Admin* admin) noexcept; |
| |
| void Destroy() noexcept; |
| ffi::Admin* admin_{nullptr}; |
| }; |
| |
| class Table { |
| public: |
| Table() noexcept; |
| ~Table() noexcept; |
| |
| Table(const Table&) = delete; |
| Table& operator=(const Table&) = delete; |
| Table(Table&& other) noexcept; |
| Table& operator=(Table&& other) noexcept; |
| |
| bool Available() const; |
| |
| Result NewAppendWriter(AppendWriter& out); |
| TableScan NewScan(); |
| |
| TableInfo GetTableInfo() const; |
| TablePath GetTablePath() const; |
| bool HasPrimaryKey() const; |
| |
| private: |
| friend class Connection; |
| friend class TableScan; |
| Table(ffi::Table* table) noexcept; |
| |
| void Destroy() noexcept; |
| ffi::Table* table_{nullptr}; |
| }; |
| |
| class TableScan { |
| public: |
| TableScan(const TableScan&) = delete; |
| TableScan& operator=(const TableScan&) = delete; |
| TableScan(TableScan&&) noexcept = default; |
| TableScan& operator=(TableScan&&) noexcept = default; |
| |
| TableScan& Project(std::vector<size_t> column_indices); |
| |
| Result CreateLogScanner(LogScanner& out); |
| Result CreateRecordBatchScanner(LogScanner& out); |
| |
| private: |
| friend class Table; |
| explicit TableScan(ffi::Table* table) noexcept; |
| |
| ffi::Table* table_{nullptr}; |
| std::vector<size_t> projection_; |
| }; |
| |
| class WriteResult { |
| public: |
| WriteResult() noexcept; |
| ~WriteResult() noexcept; |
| |
| WriteResult(const WriteResult&) = delete; |
| WriteResult& operator=(const WriteResult&) = delete; |
| WriteResult(WriteResult&& other) noexcept; |
| WriteResult& operator=(WriteResult&& other) noexcept; |
| |
| bool Available() const; |
| |
| /// Wait for server acknowledgment of the write. |
| /// For fire-and-forget, simply let the WriteResult go out of scope. |
| Result Wait(); |
| |
| private: |
| friend class AppendWriter; |
| WriteResult(ffi::WriteResult* inner) noexcept; |
| |
| void Destroy() noexcept; |
| ffi::WriteResult* inner_{nullptr}; |
| }; |
| |
| class AppendWriter { |
| public: |
| AppendWriter() noexcept; |
| ~AppendWriter() noexcept; |
| |
| AppendWriter(const AppendWriter&) = delete; |
| AppendWriter& operator=(const AppendWriter&) = delete; |
| AppendWriter(AppendWriter&& other) noexcept; |
| AppendWriter& operator=(AppendWriter&& other) noexcept; |
| |
| bool Available() const; |
| |
| Result Append(const GenericRow& row); |
| Result Append(const GenericRow& row, WriteResult& out); |
| Result Flush(); |
| |
| private: |
| friend class Table; |
| AppendWriter(ffi::AppendWriter* writer) noexcept; |
| |
| void Destroy() noexcept; |
| ffi::AppendWriter* writer_{nullptr}; |
| }; |
| |
| class LogScanner { |
| public: |
| LogScanner() noexcept; |
| ~LogScanner() noexcept; |
| |
| LogScanner(const LogScanner&) = delete; |
| LogScanner& operator=(const LogScanner&) = delete; |
| LogScanner(LogScanner&& other) noexcept; |
| LogScanner& operator=(LogScanner&& other) noexcept; |
| |
| bool Available() const; |
| |
| Result Subscribe(int32_t bucket_id, int64_t start_offset); |
| Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets); |
| Result SubscribePartitionBuckets(int64_t partition_id, int32_t bucket_id, int64_t start_offset); |
| Result SubscribePartitionBuckets(const std::vector<PartitionBucketSubscription>& subscriptions); |
| Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id); |
| Result Poll(int64_t timeout_ms, ScanRecords& out); |
| Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out); |
| |
| private: |
| friend class Table; |
| friend class TableScan; |
| LogScanner(ffi::LogScanner* scanner) noexcept; |
| |
| void Destroy() noexcept; |
| ffi::LogScanner* scanner_{nullptr}; |
| }; |
| |
| } // namespace fluss |