blob: 79561a127ff8976db765bca2976a525a3f3f2d63 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <chrono>
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>
// Forward declare Arrow classes to avoid including heavy Arrow headers in header
namespace arrow {
class RecordBatch;
}
namespace fluss {
namespace ffi {
struct Connection;
struct Admin;
struct Table;
struct AppendWriter;
struct WriteResult;
struct LogScanner;
struct UpsertWriter;
struct Lookuper;
struct ScanResultInner;
struct GenericRowInner;
struct LookupResultInner;
} // namespace ffi
/// Named constants for Fluss API error codes.
///
/// Server API errors have error_code > 0 or == -1.
/// Client-side errors have error_code == CLIENT_ERROR (-2).
/// These constants match the Rust core FlussError enum and are stable across protocol versions.
/// New server error codes work automatically (error_code is a raw int, not a closed enum) —
/// these constants are convenience names, not an exhaustive list.
struct ErrorCode {
/// Client-side error (not from server API protocol). Check error_message for details.
static constexpr int CLIENT_ERROR = -2;
/// No error.
static constexpr int NONE = 0;
/// The server experienced an unexpected error when processing the request.
static constexpr int UNKNOWN_SERVER_ERROR = -1;
/// The server disconnected before a response was received.
static constexpr int NETWORK_EXCEPTION = 1;
/// The version of API is not supported.
static constexpr int UNSUPPORTED_VERSION = 2;
/// This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.
static constexpr int CORRUPT_MESSAGE = 3;
/// The database does not exist.
static constexpr int DATABASE_NOT_EXIST = 4;
/// The database is not empty.
static constexpr int DATABASE_NOT_EMPTY = 5;
/// The database already exists.
static constexpr int DATABASE_ALREADY_EXIST = 6;
/// The table does not exist.
static constexpr int TABLE_NOT_EXIST = 7;
/// The table already exists.
static constexpr int TABLE_ALREADY_EXIST = 8;
/// The schema does not exist.
static constexpr int SCHEMA_NOT_EXIST = 9;
/// Exception occurred while storing data for log in server.
static constexpr int LOG_STORAGE_EXCEPTION = 10;
/// Exception occurred while storing data for kv in server.
static constexpr int KV_STORAGE_EXCEPTION = 11;
/// Not leader or follower.
static constexpr int NOT_LEADER_OR_FOLLOWER = 12;
/// The record is too large.
static constexpr int RECORD_TOO_LARGE_EXCEPTION = 13;
/// The record is corrupt.
static constexpr int CORRUPT_RECORD_EXCEPTION = 14;
/// The client has attempted to perform an operation on an invalid table.
static constexpr int INVALID_TABLE_EXCEPTION = 15;
/// The client has attempted to perform an operation on an invalid database.
static constexpr int INVALID_DATABASE_EXCEPTION = 16;
/// The replication factor is larger than the number of available tablet servers.
static constexpr int INVALID_REPLICATION_FACTOR = 17;
/// Produce request specified an invalid value for required acks.
static constexpr int INVALID_REQUIRED_ACKS = 18;
/// The log offset is out of range.
static constexpr int LOG_OFFSET_OUT_OF_RANGE_EXCEPTION = 19;
/// The table is not a primary key table.
static constexpr int NON_PRIMARY_KEY_TABLE_EXCEPTION = 20;
/// The table or bucket does not exist.
static constexpr int UNKNOWN_TABLE_OR_BUCKET_EXCEPTION = 21;
/// The update version is invalid.
static constexpr int INVALID_UPDATE_VERSION_EXCEPTION = 22;
/// The coordinator is invalid.
static constexpr int INVALID_COORDINATOR_EXCEPTION = 23;
/// The leader epoch is invalid.
static constexpr int FENCED_LEADER_EPOCH_EXCEPTION = 24;
/// The request timed out.
static constexpr int REQUEST_TIME_OUT = 25;
/// The general storage exception.
static constexpr int STORAGE_EXCEPTION = 26;
/// The server did not attempt to execute this operation.
static constexpr int OPERATION_NOT_ATTEMPTED_EXCEPTION = 27;
/// Records are written to the server already, but to fewer in-sync replicas than required.
static constexpr int NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION = 28;
/// Messages are rejected since there are fewer in-sync replicas than required.
static constexpr int NOT_ENOUGH_REPLICAS_EXCEPTION = 29;
/// Get file access security token exception.
static constexpr int SECURITY_TOKEN_EXCEPTION = 30;
/// The tablet server received an out of order sequence batch.
static constexpr int OUT_OF_ORDER_SEQUENCE_EXCEPTION = 31;
/// The tablet server received a duplicate sequence batch.
static constexpr int DUPLICATE_SEQUENCE_EXCEPTION = 32;
/// The tablet server could not locate the writer metadata.
static constexpr int UNKNOWN_WRITER_ID_EXCEPTION = 33;
/// The requested column projection is invalid.
static constexpr int INVALID_COLUMN_PROJECTION = 34;
/// The requested target column to write is invalid.
static constexpr int INVALID_TARGET_COLUMN = 35;
/// The partition does not exist.
static constexpr int PARTITION_NOT_EXISTS = 36;
/// The table is not partitioned.
static constexpr int TABLE_NOT_PARTITIONED_EXCEPTION = 37;
/// The timestamp is invalid.
static constexpr int INVALID_TIMESTAMP_EXCEPTION = 38;
/// The config is invalid.
static constexpr int INVALID_CONFIG_EXCEPTION = 39;
/// The lake storage is not configured.
static constexpr int LAKE_STORAGE_NOT_CONFIGURED_EXCEPTION = 40;
/// The kv snapshot does not exist.
static constexpr int KV_SNAPSHOT_NOT_EXIST = 41;
/// The partition already exists.
static constexpr int PARTITION_ALREADY_EXISTS = 42;
/// The partition spec is invalid.
static constexpr int PARTITION_SPEC_INVALID_EXCEPTION = 43;
/// There is no currently available leader for the given partition.
static constexpr int LEADER_NOT_AVAILABLE_EXCEPTION = 44;
/// Exceed the maximum number of partitions.
static constexpr int PARTITION_MAX_NUM_EXCEPTION = 45;
/// Authentication failed.
static constexpr int AUTHENTICATE_EXCEPTION = 46;
/// Security is disabled.
static constexpr int SECURITY_DISABLED_EXCEPTION = 47;
/// Authorization failed.
static constexpr int AUTHORIZATION_EXCEPTION = 48;
/// Exceed the maximum number of buckets.
static constexpr int BUCKET_MAX_NUM_EXCEPTION = 49;
/// The tiering epoch is invalid.
static constexpr int FENCED_TIERING_EPOCH_EXCEPTION = 50;
/// Authentication failed with retriable exception.
static constexpr int RETRIABLE_AUTHENTICATE_EXCEPTION = 51;
/// The server rack info is invalid.
static constexpr int INVALID_SERVER_RACK_INFO_EXCEPTION = 52;
/// The lake snapshot does not exist.
static constexpr int LAKE_SNAPSHOT_NOT_EXIST = 53;
/// The lake table already exists.
static constexpr int LAKE_TABLE_ALREADY_EXIST = 54;
/// The new ISR contains at least one ineligible replica.
static constexpr int INELIGIBLE_REPLICA_EXCEPTION = 55;
/// The alter table is invalid.
static constexpr int INVALID_ALTER_TABLE_EXCEPTION = 56;
/// Deletion operations are disabled on this table.
static constexpr int DELETION_DISABLED_EXCEPTION = 57;
/// Returns true if retrying the request may succeed. Mirrors Java's RetriableException hierarchy.
static constexpr bool IsRetriable(int32_t code) {
return code == NETWORK_EXCEPTION || code == CORRUPT_MESSAGE ||
code == SCHEMA_NOT_EXIST || code == LOG_STORAGE_EXCEPTION ||
code == KV_STORAGE_EXCEPTION || code == NOT_LEADER_OR_FOLLOWER ||
code == CORRUPT_RECORD_EXCEPTION ||
code == UNKNOWN_TABLE_OR_BUCKET_EXCEPTION || code == REQUEST_TIME_OUT ||
code == STORAGE_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION ||
code == NOT_ENOUGH_REPLICAS_EXCEPTION || code == LEADER_NOT_AVAILABLE_EXCEPTION;
}
};
struct Date {
int32_t days_since_epoch{0};
static Date FromDays(int32_t days) { return {days}; }
static Date FromYMD(int year, int month, int day);
int Year() const;
int Month() const;
int Day() const;
};
struct Time {
static constexpr int32_t kMillisPerSecond = 1000;
static constexpr int32_t kMillisPerMinute = 60 * kMillisPerSecond;
static constexpr int32_t kMillisPerHour = 60 * kMillisPerMinute;
int32_t millis_since_midnight{0};
static Time FromMillis(int32_t ms) { return {ms}; }
static Time FromHMS(int hour, int minute, int second, int millis = 0) {
return {hour * kMillisPerHour + minute * kMillisPerMinute + second * kMillisPerSecond +
millis};
}
int Hour() const { return millis_since_midnight / kMillisPerHour; }
int Minute() const { return (millis_since_midnight % kMillisPerHour) / kMillisPerMinute; }
int Second() const { return (millis_since_midnight % kMillisPerMinute) / kMillisPerSecond; }
int Millis() const { return millis_since_midnight % kMillisPerSecond; }
};
struct Timestamp {
static constexpr int32_t kMaxNanoOfMillisecond = 999999;
static constexpr int64_t kNanosPerMilli = 1000000;
int64_t epoch_millis{0};
int32_t nano_of_millisecond{0};
static Timestamp FromMillis(int64_t ms) { return {ms, 0}; }
static Timestamp FromMillisNanos(int64_t ms, int32_t nanos) {
if (nanos < 0) nanos = 0;
if (nanos > kMaxNanoOfMillisecond) nanos = kMaxNanoOfMillisecond;
return {ms, nanos};
}
static Timestamp FromTimePoint(std::chrono::system_clock::time_point tp) {
auto duration = tp.time_since_epoch();
auto ns = std::chrono::duration_cast<std::chrono::nanoseconds>(duration).count();
auto ms = ns / kNanosPerMilli;
auto nano_of_ms = static_cast<int32_t>(ns % kNanosPerMilli);
if (nano_of_ms < 0) {
nano_of_ms += kNanosPerMilli;
ms -= 1;
}
return {ms, nano_of_ms};
}
};
enum class ChangeType {
AppendOnly = 0,
Insert = 1,
UpdateBefore = 2,
UpdateAfter = 3,
Delete = 4,
};
enum class TypeId {
Unknown = 0,
Boolean = 1,
TinyInt = 2,
SmallInt = 3,
Int = 4,
BigInt = 5,
Float = 6,
Double = 7,
String = 8,
Bytes = 9,
Date = 10,
Time = 11,
Timestamp = 12,
TimestampLtz = 13,
Decimal = 14,
Char = 15,
Binary = 16,
};
class DataType {
public:
explicit DataType(TypeId id, int32_t p = 0, int32_t s = 0)
: id_(id), precision_(p), scale_(s) {}
static DataType Boolean() { return DataType(TypeId::Boolean); }
static DataType TinyInt() { return DataType(TypeId::TinyInt); }
static DataType SmallInt() { return DataType(TypeId::SmallInt); }
static DataType Int() { return DataType(TypeId::Int); }
static DataType BigInt() { return DataType(TypeId::BigInt); }
static DataType Float() { return DataType(TypeId::Float); }
static DataType Double() { return DataType(TypeId::Double); }
static DataType String() { return DataType(TypeId::String); }
static DataType Bytes() { return DataType(TypeId::Bytes); }
static DataType Date() { return DataType(TypeId::Date); }
static DataType Time() { return DataType(TypeId::Time); }
static DataType Timestamp(int32_t precision = 6) {
return DataType(TypeId::Timestamp, precision, 0);
}
static DataType TimestampLtz(int32_t precision = 6) {
return DataType(TypeId::TimestampLtz, precision, 0);
}
static DataType Decimal(int32_t precision, int32_t scale) {
return DataType(TypeId::Decimal, precision, scale);
}
static DataType Char(int32_t length) { return DataType(TypeId::Char, length, 0); }
static DataType Binary(int32_t length) { return DataType(TypeId::Binary, length, 0); }
TypeId id() const { return id_; }
int32_t precision() const { return precision_; }
int32_t scale() const { return scale_; }
private:
TypeId id_;
int32_t precision_{0};
int32_t scale_{0};
};
constexpr int64_t EARLIEST_OFFSET = -2;
enum class OffsetType {
Earliest = 0,
Latest = 1,
Timestamp = 2,
};
struct OffsetSpec {
OffsetType type;
int64_t timestamp{0};
static OffsetSpec Earliest() { return {OffsetType::Earliest, 0}; }
static OffsetSpec Latest() { return {OffsetType::Latest, 0}; }
static OffsetSpec Timestamp(int64_t ts) { return {OffsetType::Timestamp, ts}; }
};
struct Result {
int32_t error_code{0};
std::string error_message;
bool Ok() const { return error_code == 0; }
/// Returns true if retrying the request may succeed. Client-side errors always return false.
bool IsRetriable() const { return ErrorCode::IsRetriable(error_code); }
};
struct TablePath {
std::string database_name;
std::string table_name;
TablePath() = default;
TablePath(std::string db, std::string tbl)
: database_name(std::move(db)), table_name(std::move(tbl)) {}
std::string ToString() const { return database_name + "." + table_name; }
};
struct Column {
std::string name;
DataType data_type;
std::string comment;
};
struct Schema {
std::vector<Column> columns;
std::vector<std::string> primary_keys;
class Builder {
public:
Builder& AddColumn(std::string name, DataType type, std::string comment = "") {
columns_.push_back({std::move(name), std::move(type), std::move(comment)});
return *this;
}
Builder& SetPrimaryKeys(std::vector<std::string> keys) {
primary_keys_ = std::move(keys);
return *this;
}
Schema Build() { return Schema{std::move(columns_), std::move(primary_keys_)}; }
private:
std::vector<Column> columns_;
std::vector<std::string> primary_keys_;
};
static Builder NewBuilder() { return Builder(); }
};
struct TableDescriptor {
Schema schema;
std::vector<std::string> partition_keys;
int32_t bucket_count{0};
std::vector<std::string> bucket_keys;
std::unordered_map<std::string, std::string> properties;
std::unordered_map<std::string, std::string> custom_properties;
std::string comment;
class Builder {
public:
Builder& SetSchema(Schema s) {
schema_ = std::move(s);
return *this;
}
Builder& SetPartitionKeys(std::vector<std::string> keys) {
partition_keys_ = std::move(keys);
return *this;
}
Builder& SetBucketCount(int32_t count) {
bucket_count_ = count;
return *this;
}
Builder& SetBucketKeys(std::vector<std::string> keys) {
bucket_keys_ = std::move(keys);
return *this;
}
Builder& SetProperty(std::string key, std::string value) {
properties_[std::move(key)] = std::move(value);
return *this;
}
Builder& SetCustomProperty(std::string key, std::string value) {
custom_properties_[std::move(key)] = std::move(value);
return *this;
}
Builder& SetLogFormat(std::string format) {
return SetProperty("table.log.format", std::move(format));
}
Builder& SetKvFormat(std::string format) {
return SetProperty("table.kv.format", std::move(format));
}
Builder& SetComment(std::string comment) {
comment_ = std::move(comment);
return *this;
}
TableDescriptor Build() {
return TableDescriptor{std::move(schema_), std::move(partition_keys_),
bucket_count_, std::move(bucket_keys_),
std::move(properties_), std::move(custom_properties_),
std::move(comment_)};
}
private:
Schema schema_;
std::vector<std::string> partition_keys_;
int32_t bucket_count_{0};
std::vector<std::string> bucket_keys_;
std::unordered_map<std::string, std::string> properties_;
std::unordered_map<std::string, std::string> custom_properties_;
std::string comment_;
};
static Builder NewBuilder() { return Builder(); }
};
struct TableInfo {
int64_t table_id;
int32_t schema_id;
TablePath table_path;
int64_t created_time;
int64_t modified_time;
std::vector<std::string> primary_keys;
std::vector<std::string> bucket_keys;
std::vector<std::string> partition_keys;
int32_t num_buckets;
bool has_primary_key;
bool is_partitioned;
std::unordered_map<std::string, std::string> properties;
std::unordered_map<std::string, std::string> custom_properties;
std::string comment;
Schema schema;
};
namespace detail {
struct ColumnInfo {
size_t index;
TypeId type_id;
};
using ColumnMap = std::unordered_map<std::string, ColumnInfo>;
inline size_t ResolveColumn(const ColumnMap& map, const std::string& name) {
auto it = map.find(name);
if (it == map.end()) {
throw std::runtime_error("Unknown column '" + name + "'");
}
return it->second.index;
}
/// CRTP mixin that adds name-based getters to any class with index-based getters.
/// Derived must provide: `size_t Resolve(const std::string&) const`
/// and all the index-based getters (IsNull(idx), GetBool(idx), etc.).
template <typename Derived>
struct NamedGetters {
bool IsNull(const std::string& n) const { return Self().IsNull(Self().Resolve(n)); }
bool GetBool(const std::string& n) const { return Self().GetBool(Self().Resolve(n)); }
int32_t GetInt32(const std::string& n) const { return Self().GetInt32(Self().Resolve(n)); }
int64_t GetInt64(const std::string& n) const { return Self().GetInt64(Self().Resolve(n)); }
float GetFloat32(const std::string& n) const { return Self().GetFloat32(Self().Resolve(n)); }
double GetFloat64(const std::string& n) const { return Self().GetFloat64(Self().Resolve(n)); }
std::string_view GetString(const std::string& n) const {
return Self().GetString(Self().Resolve(n));
}
std::pair<const uint8_t*, size_t> GetBytes(const std::string& n) const {
return Self().GetBytes(Self().Resolve(n));
}
fluss::Date GetDate(const std::string& n) const { return Self().GetDate(Self().Resolve(n)); }
fluss::Time GetTime(const std::string& n) const { return Self().GetTime(Self().Resolve(n)); }
fluss::Timestamp GetTimestamp(const std::string& n) const {
return Self().GetTimestamp(Self().Resolve(n));
}
std::string GetDecimalString(const std::string& n) const {
return Self().GetDecimalString(Self().Resolve(n));
}
private:
const Derived& Self() const { return static_cast<const Derived&>(*this); }
};
struct ScanData {
ffi::ScanResultInner* raw;
ColumnMap columns;
ScanData(ffi::ScanResultInner* r, ColumnMap cols) : raw(r), columns(std::move(cols)) {}
~ScanData();
ScanData(const ScanData&) = delete;
ScanData& operator=(const ScanData&) = delete;
};
} // namespace detail
class GenericRow {
public:
GenericRow();
explicit GenericRow(size_t field_count);
~GenericRow() noexcept;
GenericRow(const GenericRow&) = delete;
GenericRow& operator=(const GenericRow&) = delete;
GenericRow(GenericRow&& other) noexcept;
GenericRow& operator=(GenericRow&& other) noexcept;
bool Available() const;
void Reset();
// ── Index-based setters ──────────────────────────────────────────
void SetNull(size_t idx);
void SetBool(size_t idx, bool v);
void SetInt32(size_t idx, int32_t v);
void SetInt64(size_t idx, int64_t v);
void SetFloat32(size_t idx, float v);
void SetFloat64(size_t idx, double v);
void SetString(size_t idx, std::string v);
void SetBytes(size_t idx, std::vector<uint8_t> v);
void SetDate(size_t idx, fluss::Date d);
void SetTime(size_t idx, fluss::Time t);
void SetTimestampNtz(size_t idx, fluss::Timestamp ts);
void SetTimestampLtz(size_t idx, fluss::Timestamp ts);
void SetDecimal(size_t idx, const std::string& value);
// ── Name-based setters (require schema — see Table::NewRow()) ───
void Set(const std::string& name, std::nullptr_t) { SetNull(Resolve(name)); }
void Set(const std::string& name, bool v) { SetBool(Resolve(name), v); }
void Set(const std::string& name, int32_t v) { SetInt32(Resolve(name), v); }
void Set(const std::string& name, int64_t v) { SetInt64(Resolve(name), v); }
void Set(const std::string& name, float v) { SetFloat32(Resolve(name), v); }
void Set(const std::string& name, double v) { SetFloat64(Resolve(name), v); }
// const char* overload to prevent "string literal" -> bool conversion
void Set(const std::string& name, const char* v) {
auto [idx, type] = ResolveColumn(name);
if (type == TypeId::Decimal) {
SetDecimal(idx, v);
} else if (type == TypeId::String) {
SetString(idx, v);
} else {
throw std::runtime_error("GenericRow::Set: column '" + name +
"' is not a string or decimal column");
}
}
void Set(const std::string& name, std::string v) {
auto [idx, type] = ResolveColumn(name);
if (type == TypeId::Decimal) {
SetDecimal(idx, v);
} else if (type == TypeId::String) {
SetString(idx, std::move(v));
} else {
throw std::runtime_error("GenericRow::Set: column '" + name +
"' is not a string or decimal column");
}
}
void Set(const std::string& name, std::vector<uint8_t> v) {
SetBytes(Resolve(name), std::move(v));
}
void Set(const std::string& name, fluss::Date d) { SetDate(Resolve(name), d); }
void Set(const std::string& name, fluss::Time t) { SetTime(Resolve(name), t); }
void Set(const std::string& name, fluss::Timestamp ts) {
auto [idx, type] = ResolveColumn(name);
if (type == TypeId::TimestampLtz) {
SetTimestampLtz(idx, ts);
} else if (type == TypeId::Timestamp) {
SetTimestampNtz(idx, ts);
} else {
throw std::runtime_error("GenericRow::Set: column '" + name +
"' is not a timestamp column");
}
}
private:
friend class Table;
friend class AppendWriter;
friend class UpsertWriter;
friend class Lookuper;
using ColumnInfo = detail::ColumnInfo;
using ColumnMap = detail::ColumnMap;
size_t Resolve(const std::string& name) const { return ResolveColumn(name).index; }
const ColumnInfo& ResolveColumn(const std::string& name) const {
if (!column_map_) {
throw std::runtime_error(
"GenericRow: name-based Set() requires a schema. "
"Use Table::NewRow() to create a schema-aware row.");
}
auto it = column_map_->find(name);
if (it == column_map_->end()) {
throw std::runtime_error("GenericRow: unknown column '" + name + "'");
}
return it->second;
}
void Destroy() noexcept;
ffi::GenericRowInner* inner_{nullptr};
std::shared_ptr<ColumnMap> column_map_;
};
/// Read-only row view for scan results. Zero-copy access to string and bytes data.
///
/// RowView shares ownership of the underlying scan data via reference counting,
/// so it can safely outlive the ScanRecords that produced it.
class RowView : public detail::NamedGetters<RowView> {
friend struct detail::NamedGetters<RowView>;
public:
RowView(std::shared_ptr<const detail::ScanData> data, size_t bucket_idx, size_t rec_idx)
: data_(std::move(data)), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
// ── Index-based getters ──────────────────────────────────────────
size_t FieldCount() const;
TypeId GetType(size_t idx) const;
bool IsNull(size_t idx) const;
bool GetBool(size_t idx) const;
int32_t GetInt32(size_t idx) const;
int64_t GetInt64(size_t idx) const;
float GetFloat32(size_t idx) const;
double GetFloat64(size_t idx) const;
std::string_view GetString(size_t idx) const;
std::pair<const uint8_t*, size_t> GetBytes(size_t idx) const;
fluss::Date GetDate(size_t idx) const;
fluss::Time GetTime(size_t idx) const;
fluss::Timestamp GetTimestamp(size_t idx) const;
bool IsDecimal(size_t idx) const;
std::string GetDecimalString(size_t idx) const;
// Name-based getters inherited from detail::NamedGetters<RowView>
using detail::NamedGetters<RowView>::IsNull;
using detail::NamedGetters<RowView>::GetBool;
using detail::NamedGetters<RowView>::GetInt32;
using detail::NamedGetters<RowView>::GetInt64;
using detail::NamedGetters<RowView>::GetFloat32;
using detail::NamedGetters<RowView>::GetFloat64;
using detail::NamedGetters<RowView>::GetString;
using detail::NamedGetters<RowView>::GetBytes;
using detail::NamedGetters<RowView>::GetDate;
using detail::NamedGetters<RowView>::GetTime;
using detail::NamedGetters<RowView>::GetTimestamp;
using detail::NamedGetters<RowView>::GetDecimalString;
private:
size_t Resolve(const std::string& name) const {
if (!data_) {
throw std::runtime_error("RowView: name-based access not available");
}
return detail::ResolveColumn(data_->columns, name);
}
std::shared_ptr<const detail::ScanData> data_;
size_t bucket_idx_;
size_t rec_idx_;
};
/// Identifies a specific bucket, optionally within a partition.
struct TableBucket {
int64_t table_id;
int32_t bucket_id;
std::optional<int64_t> partition_id;
bool operator==(const TableBucket& other) const {
return table_id == other.table_id && bucket_id == other.bucket_id &&
partition_id == other.partition_id;
}
bool operator!=(const TableBucket& other) const { return !(*this == other); }
};
/// A single scan record. Contains metadata and a RowView for field access.
///
/// ScanRecord is a value type that can be freely copied, stored, and
/// accumulated across multiple Poll() calls.
struct ScanRecord {
int64_t offset;
int64_t timestamp;
ChangeType change_type;
RowView row;
};
/// A bundle of scan records belonging to a single bucket.
///
/// BucketRecords is a value type — it shares ownership of the underlying scan data
/// via reference counting, so it can safely outlive the ScanRecords that produced it.
class BucketRecords {
public:
BucketRecords(std::shared_ptr<const detail::ScanData> data, TableBucket bucket,
size_t bucket_idx, size_t count)
: data_(std::move(data)),
bucket_(std::move(bucket)),
bucket_idx_(bucket_idx),
count_(count) {}
/// The bucket these records belong to.
const TableBucket& Bucket() const { return bucket_; }
/// Number of records in this bucket.
size_t Size() const { return count_; }
bool Empty() const { return count_ == 0; }
/// Access a record by its position within this bucket (0-based).
ScanRecord operator[](size_t idx) const;
class Iterator {
public:
ScanRecord operator*() const;
Iterator& operator++() {
++idx_;
return *this;
}
bool operator!=(const Iterator& other) const { return idx_ != other.idx_; }
private:
friend class BucketRecords;
Iterator(const BucketRecords* owner, size_t idx) : owner_(owner), idx_(idx) {}
const BucketRecords* owner_;
size_t idx_;
};
Iterator begin() const { return Iterator(this, 0); }
Iterator end() const { return Iterator(this, count_); }
private:
std::shared_ptr<const detail::ScanData> data_;
TableBucket bucket_;
size_t bucket_idx_;
size_t count_;
};
class ScanRecords {
public:
ScanRecords() noexcept = default;
~ScanRecords() noexcept = default;
ScanRecords(const ScanRecords&) = delete;
ScanRecords& operator=(const ScanRecords&) = delete;
ScanRecords(ScanRecords&&) noexcept = default;
ScanRecords& operator=(ScanRecords&&) noexcept = default;
/// Total number of records across all buckets.
size_t Count() const;
bool IsEmpty() const;
/// Number of distinct buckets with records.
size_t BucketCount() const;
/// List of distinct buckets that have records.
std::vector<TableBucket> Buckets() const;
/// Get records for a specific bucket.
///
/// Returns an empty BucketRecords if the bucket is not present (matches Rust/Java).
/// Note: O(B) linear scan. For iteration over all buckets, prefer BucketAt(idx).
BucketRecords Records(const TableBucket& bucket) const;
/// Get records by bucket index (0-based). O(1).
///
/// Throws std::out_of_range if idx >= BucketCount().
BucketRecords BucketAt(size_t idx) const;
/// Flat iterator over all records across all buckets (matches Java Iterable<ScanRecord>).
class Iterator {
public:
ScanRecord operator*() const;
Iterator& operator++();
bool operator!=(const Iterator& other) const {
return bucket_idx_ != other.bucket_idx_ || rec_idx_ != other.rec_idx_;
}
private:
friend class ScanRecords;
Iterator(const ScanRecords* owner, size_t bucket_idx, size_t rec_idx)
: owner_(owner), bucket_idx_(bucket_idx), rec_idx_(rec_idx) {}
const ScanRecords* owner_;
size_t bucket_idx_;
size_t rec_idx_;
};
Iterator begin() const;
Iterator end() const { return Iterator(this, BucketCount(), 0); }
private:
friend class LogScanner;
ScanRecord RecordAt(size_t bucket, size_t rec_idx) const;
std::shared_ptr<const detail::ScanData> data_;
};
class ArrowRecordBatch {
public:
std::shared_ptr<arrow::RecordBatch> GetArrowRecordBatch() const { return batch_; }
bool Available() const;
// Get number of rows in the batch
int64_t NumRows() const;
// Get ScanBatch metadata
int64_t GetTableId() const;
int64_t GetPartitionId() const;
int32_t GetBucketId() const;
int64_t GetBaseOffset() const;
int64_t GetLastOffset() const;
private:
friend class LogScanner;
explicit ArrowRecordBatch(std::shared_ptr<arrow::RecordBatch> batch, int64_t table_id,
int64_t partition_id, int32_t bucket_id,
int64_t base_offset) noexcept;
std::shared_ptr<arrow::RecordBatch> batch_{nullptr};
int64_t table_id_;
int64_t partition_id_;
int32_t bucket_id_;
int64_t base_offset_;
};
struct ArrowRecordBatches {
std::vector<std::unique_ptr<ArrowRecordBatch>> batches;
size_t Size() const { return batches.size(); }
bool Empty() const { return batches.empty(); }
const std::unique_ptr<ArrowRecordBatch>& operator[](size_t idx) const { return batches[idx]; }
auto begin() const { return batches.begin(); }
auto end() const { return batches.end(); }
};
struct BucketOffset {
int64_t table_id;
int64_t partition_id;
int32_t bucket_id;
int64_t offset;
};
struct BucketSubscription {
int32_t bucket_id;
int64_t offset;
};
struct PartitionBucketSubscription {
int64_t partition_id;
int32_t bucket_id;
int64_t offset;
};
struct LakeSnapshot {
int64_t snapshot_id;
std::vector<BucketOffset> bucket_offsets;
};
struct PartitionInfo {
int64_t partition_id;
std::string partition_name;
};
struct ServerNode {
int32_t id;
std::string host;
uint32_t port;
std::string server_type;
std::string uid;
};
/// Descriptor for create_database (optional). Leave comment and properties empty for default.
struct DatabaseDescriptor {
std::string comment;
std::unordered_map<std::string, std::string> properties;
};
/// Metadata returned by GetDatabaseInfo.
struct DatabaseInfo {
std::string database_name;
std::string comment;
std::unordered_map<std::string, std::string> properties;
int64_t created_time{0};
int64_t modified_time{0};
};
/// Read-only result for lookup operations.
class LookupResult : public detail::NamedGetters<LookupResult> {
friend struct detail::NamedGetters<LookupResult>;
public:
LookupResult() noexcept;
~LookupResult() noexcept;
LookupResult(const LookupResult&) = delete;
LookupResult& operator=(const LookupResult&) = delete;
LookupResult(LookupResult&& other) noexcept;
LookupResult& operator=(LookupResult&& other) noexcept;
bool Found() const;
size_t FieldCount() const;
// ── Index-based getters ──────────────────────────────────────────
TypeId GetType(size_t idx) const;
bool IsNull(size_t idx) const;
bool GetBool(size_t idx) const;
int32_t GetInt32(size_t idx) const;
int64_t GetInt64(size_t idx) const;
float GetFloat32(size_t idx) const;
double GetFloat64(size_t idx) const;
std::string_view GetString(size_t idx) const;
std::pair<const uint8_t*, size_t> GetBytes(size_t idx) const;
fluss::Date GetDate(size_t idx) const;
fluss::Time GetTime(size_t idx) const;
fluss::Timestamp GetTimestamp(size_t idx) const;
bool IsDecimal(size_t idx) const;
std::string GetDecimalString(size_t idx) const;
// Name-based getters inherited from detail::NamedGetters<LookupResult>
using detail::NamedGetters<LookupResult>::IsNull;
using detail::NamedGetters<LookupResult>::GetBool;
using detail::NamedGetters<LookupResult>::GetInt32;
using detail::NamedGetters<LookupResult>::GetInt64;
using detail::NamedGetters<LookupResult>::GetFloat32;
using detail::NamedGetters<LookupResult>::GetFloat64;
using detail::NamedGetters<LookupResult>::GetString;
using detail::NamedGetters<LookupResult>::GetBytes;
using detail::NamedGetters<LookupResult>::GetDate;
using detail::NamedGetters<LookupResult>::GetTime;
using detail::NamedGetters<LookupResult>::GetTimestamp;
using detail::NamedGetters<LookupResult>::GetDecimalString;
private:
friend class Lookuper;
size_t Resolve(const std::string& name) const {
if (!column_map_) {
BuildColumnMap();
}
return detail::ResolveColumn(*column_map_, name);
}
void Destroy() noexcept;
void BuildColumnMap() const;
ffi::LookupResultInner* inner_{nullptr};
mutable std::shared_ptr<detail::ColumnMap> column_map_;
};
class AppendWriter;
class UpsertWriter;
class Lookuper;
class WriteResult;
class LogScanner;
class Admin;
class Table;
class TableAppend;
class TableUpsert;
class TableLookup;
class TableScan;
struct Configuration {
// Coordinator server address
std::string bootstrap_servers{"127.0.0.1:9123"};
// Max request size in bytes (10 MB)
int32_t writer_request_max_size{10 * 1024 * 1024};
// Writer acknowledgment mode: "all", "0", "1", or "-1"
std::string writer_acks{"all"};
// Max number of writer retries
int32_t writer_retries{std::numeric_limits<int32_t>::max()};
// Writer batch size in bytes (2 MB)
int32_t writer_batch_size{2 * 1024 * 1024};
// Bucket assigner for tables without bucket keys: "sticky" or "round_robin"
std::string writer_bucket_no_key_assigner{"sticky"};
// Number of remote log batches to prefetch during scanning
size_t scanner_remote_log_prefetch_num{4};
// Number of threads for downloading remote log data
size_t remote_file_download_thread_num{3};
// Remote log read concurrency within one file (streaming read path)
size_t scanner_remote_log_read_concurrency{4};
// Maximum number of records returned in a single call to Poll() for LogScanner
size_t scanner_log_max_poll_records{500};
// Maximum bytes per fetch response for LogScanner (16 MB)
int32_t scanner_log_fetch_max_bytes{16 * 1024 * 1024};
// Minimum bytes to accumulate before server returns a fetch response
int32_t scanner_log_fetch_min_bytes{1};
// Maximum time (ms) the server may wait to satisfy min bytes
int32_t scanner_log_fetch_wait_max_time_ms{500};
// Maximum bytes per fetch response per bucket for LogScanner (1 MB)
int32_t scanner_log_fetch_max_bytes_for_bucket{1024 * 1024};
int64_t writer_batch_timeout_ms{100};
// Connect timeout in milliseconds for TCP transport connect
uint64_t connect_timeout_ms{120000};
// Security protocol: "PLAINTEXT" (default, no auth) or "sasl" (SASL auth)
std::string security_protocol{"PLAINTEXT"};
// SASL mechanism (only "PLAIN" is supported)
std::string security_sasl_mechanism{"PLAIN"};
// SASL username (required when security_protocol is "sasl")
std::string security_sasl_username;
// SASL password (required when security_protocol is "sasl")
std::string security_sasl_password;
};
class Connection {
public:
Connection() noexcept;
~Connection() noexcept;
Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;
Connection(Connection&& other) noexcept;
Connection& operator=(Connection&& other) noexcept;
static Result Create(const Configuration& config, Connection& out);
bool Available() const;
Result GetAdmin(Admin& out);
Result GetTable(const TablePath& table_path, Table& out);
private:
void Destroy() noexcept;
ffi::Connection* conn_{nullptr};
};
class Admin {
public:
Admin() noexcept;
~Admin() noexcept;
Admin(const Admin&) = delete;
Admin& operator=(const Admin&) = delete;
Admin(Admin&& other) noexcept;
Admin& operator=(Admin&& other) noexcept;
bool Available() const;
Result CreateTable(const TablePath& table_path, const TableDescriptor& descriptor,
bool ignore_if_exists = false);
Result DropTable(const TablePath& table_path, bool ignore_if_not_exists = false);
Result GetTableInfo(const TablePath& table_path, TableInfo& out);
Result GetLatestLakeSnapshot(const TablePath& table_path, LakeSnapshot& out);
Result ListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetSpec& offset_spec, std::unordered_map<int32_t, int64_t>& out);
Result ListPartitionOffsets(const TablePath& table_path, const std::string& partition_name,
const std::vector<int32_t>& bucket_ids,
const OffsetSpec& offset_spec,
std::unordered_map<int32_t, int64_t>& out);
Result ListPartitionInfos(const TablePath& table_path, std::vector<PartitionInfo>& out);
Result ListPartitionInfos(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
std::vector<PartitionInfo>& out);
Result CreatePartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_exists = false);
Result DropPartition(const TablePath& table_path,
const std::unordered_map<std::string, std::string>& partition_spec,
bool ignore_if_not_exists = false);
Result CreateDatabase(const std::string& database_name, const DatabaseDescriptor& descriptor,
bool ignore_if_exists = false);
Result DropDatabase(const std::string& database_name, bool ignore_if_not_exists = false,
bool cascade = true);
Result ListDatabases(std::vector<std::string>& out);
Result DatabaseExists(const std::string& database_name, bool& out);
Result GetDatabaseInfo(const std::string& database_name, DatabaseInfo& out);
Result ListTables(const std::string& database_name, std::vector<std::string>& out);
Result TableExists(const TablePath& table_path, bool& out);
Result GetServerNodes(std::vector<ServerNode>& out);
private:
Result DoListOffsets(const TablePath& table_path, const std::vector<int32_t>& bucket_ids,
const OffsetSpec& offset_spec, std::unordered_map<int32_t, int64_t>& out,
const std::string* partition_name = nullptr);
friend class Connection;
Admin(ffi::Admin* admin) noexcept;
void Destroy() noexcept;
ffi::Admin* admin_{nullptr};
};
class Table {
public:
Table() noexcept;
~Table() noexcept;
Table(const Table&) = delete;
Table& operator=(const Table&) = delete;
Table(Table&& other) noexcept;
Table& operator=(Table&& other) noexcept;
bool Available() const;
GenericRow NewRow() const;
TableAppend NewAppend();
TableUpsert NewUpsert();
TableLookup NewLookup();
TableScan NewScan();
TableInfo GetTableInfo() const;
TablePath GetTablePath() const;
bool HasPrimaryKey() const;
private:
friend class Connection;
friend class TableAppend;
friend class TableUpsert;
friend class TableLookup;
friend class TableScan;
Table(ffi::Table* table) noexcept;
void Destroy() noexcept;
const std::shared_ptr<GenericRow::ColumnMap>& GetColumnMap() const;
ffi::Table* table_{nullptr};
mutable std::shared_ptr<GenericRow::ColumnMap> column_map_;
};
class TableAppend {
public:
TableAppend(const TableAppend&) = delete;
TableAppend& operator=(const TableAppend&) = delete;
TableAppend(TableAppend&&) noexcept = default;
TableAppend& operator=(TableAppend&&) noexcept = default;
Result CreateWriter(AppendWriter& out);
private:
friend class Table;
explicit TableAppend(ffi::Table* table) noexcept;
ffi::Table* table_{nullptr};
};
class TableUpsert {
public:
TableUpsert(const TableUpsert&) = delete;
TableUpsert& operator=(const TableUpsert&) = delete;
TableUpsert(TableUpsert&&) noexcept = default;
TableUpsert& operator=(TableUpsert&&) noexcept = default;
TableUpsert& PartialUpdateByIndex(std::vector<size_t> column_indices);
TableUpsert& PartialUpdateByName(std::vector<std::string> column_names);
Result CreateWriter(UpsertWriter& out);
private:
friend class Table;
explicit TableUpsert(ffi::Table* table) noexcept;
std::vector<size_t> ResolveNameProjection() const;
ffi::Table* table_{nullptr};
std::vector<size_t> column_indices_;
std::vector<std::string> column_names_;
};
class TableLookup {
public:
TableLookup(const TableLookup&) = delete;
TableLookup& operator=(const TableLookup&) = delete;
TableLookup(TableLookup&&) noexcept = default;
TableLookup& operator=(TableLookup&&) noexcept = default;
Result CreateLookuper(Lookuper& out);
private:
friend class Table;
explicit TableLookup(ffi::Table* table) noexcept;
ffi::Table* table_{nullptr};
};
class TableScan {
public:
TableScan(const TableScan&) = delete;
TableScan& operator=(const TableScan&) = delete;
TableScan(TableScan&&) noexcept = default;
TableScan& operator=(TableScan&&) noexcept = default;
TableScan& ProjectByIndex(std::vector<size_t> column_indices);
TableScan& ProjectByName(std::vector<std::string> column_names);
Result CreateLogScanner(LogScanner& out);
Result CreateRecordBatchLogScanner(LogScanner& out);
private:
friend class Table;
explicit TableScan(ffi::Table* table) noexcept;
std::vector<size_t> ResolveNameProjection() const;
Result DoCreateScanner(LogScanner& out, bool is_record_batch);
ffi::Table* table_{nullptr};
std::vector<size_t> projection_;
std::vector<std::string> name_projection_;
};
class WriteResult {
public:
WriteResult() noexcept;
~WriteResult() noexcept;
WriteResult(const WriteResult&) = delete;
WriteResult& operator=(const WriteResult&) = delete;
WriteResult(WriteResult&& other) noexcept;
WriteResult& operator=(WriteResult&& other) noexcept;
bool Available() const;
/// Wait for server acknowledgment of the write.
/// For fire-and-forget, simply let the WriteResult go out of scope.
Result Wait();
private:
friend class AppendWriter;
friend class UpsertWriter;
WriteResult(ffi::WriteResult* inner) noexcept;
void Destroy() noexcept;
ffi::WriteResult* inner_{nullptr};
};
class AppendWriter {
public:
AppendWriter() noexcept;
~AppendWriter() noexcept;
AppendWriter(const AppendWriter&) = delete;
AppendWriter& operator=(const AppendWriter&) = delete;
AppendWriter(AppendWriter&& other) noexcept;
AppendWriter& operator=(AppendWriter&& other) noexcept;
bool Available() const;
Result Append(const GenericRow& row);
Result Append(const GenericRow& row, WriteResult& out);
Result AppendArrowBatch(const std::shared_ptr<arrow::RecordBatch>& batch);
Result AppendArrowBatch(const std::shared_ptr<arrow::RecordBatch>& batch, WriteResult& out);
Result Flush();
private:
friend class Table;
friend class TableAppend;
AppendWriter(ffi::AppendWriter* writer) noexcept;
void Destroy() noexcept;
ffi::AppendWriter* writer_{nullptr};
};
class UpsertWriter {
public:
UpsertWriter() noexcept;
~UpsertWriter() noexcept;
UpsertWriter(const UpsertWriter&) = delete;
UpsertWriter& operator=(const UpsertWriter&) = delete;
UpsertWriter(UpsertWriter&& other) noexcept;
UpsertWriter& operator=(UpsertWriter&& other) noexcept;
bool Available() const;
Result Upsert(const GenericRow& row);
Result Upsert(const GenericRow& row, WriteResult& out);
Result Delete(const GenericRow& row);
Result Delete(const GenericRow& row, WriteResult& out);
Result Flush();
private:
friend class Table;
friend class TableUpsert;
UpsertWriter(ffi::UpsertWriter* writer) noexcept;
void Destroy() noexcept;
ffi::UpsertWriter* writer_{nullptr};
};
class Lookuper {
public:
Lookuper() noexcept;
~Lookuper() noexcept;
Lookuper(const Lookuper&) = delete;
Lookuper& operator=(const Lookuper&) = delete;
Lookuper(Lookuper&& other) noexcept;
Lookuper& operator=(Lookuper&& other) noexcept;
bool Available() const;
Result Lookup(const GenericRow& pk_row, LookupResult& out);
private:
friend class Table;
friend class TableLookup;
Lookuper(ffi::Lookuper* lookuper) noexcept;
void Destroy() noexcept;
ffi::Lookuper* lookuper_{nullptr};
};
class LogScanner {
public:
LogScanner() noexcept;
~LogScanner() noexcept;
LogScanner(const LogScanner&) = delete;
LogScanner& operator=(const LogScanner&) = delete;
LogScanner(LogScanner&& other) noexcept;
LogScanner& operator=(LogScanner&& other) noexcept;
bool Available() const;
Result Subscribe(int32_t bucket_id, int64_t start_offset);
Result Subscribe(const std::vector<BucketSubscription>& bucket_offsets);
Result SubscribePartitionBuckets(int64_t partition_id, int32_t bucket_id, int64_t start_offset);
Result SubscribePartitionBuckets(const std::vector<PartitionBucketSubscription>& subscriptions);
Result Unsubscribe(int32_t bucket_id);
Result UnsubscribePartition(int64_t partition_id, int32_t bucket_id);
Result Poll(int64_t timeout_ms, ScanRecords& out);
Result PollRecordBatch(int64_t timeout_ms, ArrowRecordBatches& out);
private:
friend class Table;
friend class TableScan;
LogScanner(ffi::LogScanner* scanner) noexcept;
void Destroy() noexcept;
ffi::LogScanner* scanner_{nullptr};
};
} // namespace fluss