blob: fd3c980ba1b3f86f4a41b6b0384633d3b00fdcbe [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::*;
use pyo3::types::PyDict;
/// Configuration for Fluss client
#[pyclass]
#[derive(Clone)]
pub struct Config {
inner: fcore::config::Config,
}
#[pymethods]
impl Config {
/// Create a new Config with optional properties from a dictionary
#[new]
#[pyo3(signature = (properties = None))]
fn new(properties: Option<&Bound<'_, PyDict>>) -> PyResult<Self> {
let mut config = fcore::config::Config::default();
if let Some(props) = properties {
for item in props.iter() {
let key: String = item.0.extract()?;
let value: String = item.1.extract()?;
match key.as_str() {
"bootstrap.servers" => {
config.bootstrap_servers = value;
}
"writer.request-max-size" => {
config.writer_request_max_size = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.acks" => {
config.writer_acks = value;
}
"writer.retries" => {
config.writer_retries = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.batch-size" => {
config.writer_batch_size = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"writer.batch-timeout-ms" => {
config.writer_batch_timeout_ms = value.parse::<i64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.remote-log.prefetch-num" => {
config.scanner_remote_log_prefetch_num =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"remote-file.download-thread-num" => {
config.remote_file_download_thread_num =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.remote-log.read-concurrency" => {
config.scanner_remote_log_read_concurrency =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.log.max-poll-records" => {
config.scanner_log_max_poll_records =
value.parse::<usize>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.log.fetch.max-bytes" => {
config.scanner_log_fetch_max_bytes = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.log.fetch.min-bytes" => {
config.scanner_log_fetch_min_bytes = value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"scanner.log.fetch.wait-max-time-ms" => {
config.scanner_log_fetch_wait_max_time_ms =
value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"scanner.log.fetch.max-bytes-for-bucket" => {
config.scanner_log_fetch_max_bytes_for_bucket =
value.parse::<i32>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"writer.bucket.no-key-assigner" => {
config.writer_bucket_no_key_assigner =
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for '{key}': {e}"
))
})?;
}
"connect-timeout" => {
config.connect_timeout_ms = value.parse::<u64>().map_err(|e| {
FlussError::new_err(format!("Invalid value '{value}' for '{key}': {e}"))
})?;
}
"security.protocol" => {
config.security_protocol = value;
}
"security.sasl.mechanism" => {
config.security_sasl_mechanism = value;
}
"security.sasl.username" => {
config.security_sasl_username = value;
}
"security.sasl.password" => {
config.security_sasl_password = value;
}
_ => {
return Err(FlussError::new_err(format!("Unknown property: {key}")));
}
}
}
}
Ok(Self { inner: config })
}
/// Get the bootstrap servers
#[getter]
fn bootstrap_servers(&self) -> String {
self.inner.bootstrap_servers.clone()
}
/// Set the bootstrap servers
#[setter]
fn set_bootstrap_servers(&mut self, server: String) {
self.inner.bootstrap_servers = server;
}
/// Get the writer request max size
#[getter]
fn writer_request_max_size(&self) -> i32 {
self.inner.writer_request_max_size
}
/// Set the writer request max size
#[setter]
fn set_writer_request_max_size(&mut self, size: i32) {
self.inner.writer_request_max_size = size;
}
/// Get the writer acks
#[getter]
fn writer_acks(&self) -> String {
self.inner.writer_acks.clone()
}
/// Set the writer acks
#[setter]
fn set_writer_acks(&mut self, acks: String) {
self.inner.writer_acks = acks;
}
/// Get the writer retries
#[getter]
fn writer_retries(&self) -> i32 {
self.inner.writer_retries
}
/// Set the writer retries
#[setter]
fn set_writer_retries(&mut self, retries: i32) {
self.inner.writer_retries = retries;
}
/// Get the writer batch size
#[getter]
fn writer_batch_size(&self) -> i32 {
self.inner.writer_batch_size
}
/// Set the writer batch size
#[setter]
fn set_writer_batch_size(&mut self, size: i32) {
self.inner.writer_batch_size = size;
}
/// Get the scanner remote log prefetch num
#[getter]
fn scanner_remote_log_prefetch_num(&self) -> usize {
self.inner.scanner_remote_log_prefetch_num
}
/// Set the scanner remote log prefetch num
#[setter]
fn set_scanner_remote_log_prefetch_num(&mut self, num: usize) {
self.inner.scanner_remote_log_prefetch_num = num;
}
/// Get the remote file download thread num
#[getter]
fn remote_file_download_thread_num(&self) -> usize {
self.inner.remote_file_download_thread_num
}
/// Set the remote file download thread num
#[setter]
fn set_remote_file_download_thread_num(&mut self, num: usize) {
self.inner.remote_file_download_thread_num = num;
}
/// Get the scanner remote log read concurrency
#[getter]
fn scanner_remote_log_read_concurrency(&self) -> usize {
self.inner.scanner_remote_log_read_concurrency
}
/// Set the scanner remote log read concurrency
#[setter]
fn set_scanner_remote_log_read_concurrency(&mut self, num: usize) {
self.inner.scanner_remote_log_read_concurrency = num;
}
/// Get the scanner log max poll records
#[getter]
fn scanner_log_max_poll_records(&self) -> usize {
self.inner.scanner_log_max_poll_records
}
/// Set the scanner log max poll records
#[setter]
fn set_scanner_log_max_poll_records(&mut self, num: usize) {
self.inner.scanner_log_max_poll_records = num;
}
/// Get the writer batch timeout in milliseconds
#[getter]
fn writer_batch_timeout_ms(&self) -> i64 {
self.inner.writer_batch_timeout_ms
}
/// Set the writer batch timeout in milliseconds
#[setter]
fn set_writer_batch_timeout_ms(&mut self, timeout: i64) {
self.inner.writer_batch_timeout_ms = timeout;
}
/// Get the bucket assignment strategy for tables without bucket keys
#[getter]
fn writer_bucket_no_key_assigner(&self) -> String {
self.inner.writer_bucket_no_key_assigner.to_string()
}
/// Set the bucket assignment strategy for tables without bucket keys
#[setter]
fn set_writer_bucket_no_key_assigner(&mut self, value: String) -> PyResult<()> {
self.inner.writer_bucket_no_key_assigner =
value.parse::<fcore::config::NoKeyAssigner>().map_err(|e| {
FlussError::new_err(format!(
"Invalid value '{value}' for 'writer.bucket.no-key-assigner': {e}"
))
})?;
Ok(())
}
/// Get the connect timeout in milliseconds
#[getter]
fn connect_timeout_ms(&self) -> u64 {
self.inner.connect_timeout_ms
}
/// Set the connect timeout in milliseconds
#[setter]
fn set_connect_timeout_ms(&mut self, timeout: u64) {
self.inner.connect_timeout_ms = timeout;
}
/// Get the security protocol
#[getter]
fn security_protocol(&self) -> String {
self.inner.security_protocol.clone()
}
/// Set the security protocol
#[setter]
fn set_security_protocol(&mut self, protocol: String) {
self.inner.security_protocol = protocol;
}
/// Get the SASL mechanism
#[getter]
fn security_sasl_mechanism(&self) -> String {
self.inner.security_sasl_mechanism.clone()
}
/// Set the SASL mechanism
#[setter]
fn set_security_sasl_mechanism(&mut self, mechanism: String) {
self.inner.security_sasl_mechanism = mechanism;
}
/// Get the SASL username
#[getter]
fn security_sasl_username(&self) -> String {
self.inner.security_sasl_username.clone()
}
/// Set the SASL username
#[setter]
fn set_security_sasl_username(&mut self, username: String) {
self.inner.security_sasl_username = username;
}
/// Get the SASL password
#[getter]
fn security_sasl_password(&self) -> String {
self.inner.security_sasl_password.clone()
}
/// Set the SASL password
#[setter]
fn set_security_sasl_password(&mut self, password: String) {
self.inner.security_sasl_password = password;
}
/// Get the maximum bytes per fetch response for LogScanner
#[getter]
fn scanner_log_fetch_max_bytes(&self) -> i32 {
self.inner.scanner_log_fetch_max_bytes
}
/// Set the maximum bytes per fetch response for LogScanner
#[setter]
fn set_scanner_log_fetch_max_bytes(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_max_bytes = bytes;
}
/// Get the minimum bytes to accumulate before returning a fetch response
#[getter]
fn scanner_log_fetch_min_bytes(&self) -> i32 {
self.inner.scanner_log_fetch_min_bytes
}
/// Set the minimum bytes to accumulate before returning a fetch response
#[setter]
fn set_scanner_log_fetch_min_bytes(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_min_bytes = bytes;
}
/// Get the maximum time (ms) the server may wait to satisfy min-bytes
#[getter]
fn scanner_log_fetch_wait_max_time_ms(&self) -> i32 {
self.inner.scanner_log_fetch_wait_max_time_ms
}
/// Set the maximum time (ms) the server may wait to satisfy min-bytes
#[setter]
fn set_scanner_log_fetch_wait_max_time_ms(&mut self, ms: i32) {
self.inner.scanner_log_fetch_wait_max_time_ms = ms;
}
/// Get the maximum bytes per fetch response per bucket for LogScanner
#[getter]
fn scanner_log_fetch_max_bytes_for_bucket(&self) -> i32 {
self.inner.scanner_log_fetch_max_bytes_for_bucket
}
/// Set the maximum bytes per fetch response per bucket for LogScanner
#[setter]
fn set_scanner_log_fetch_max_bytes_for_bucket(&mut self, bytes: i32) {
self.inner.scanner_log_fetch_max_bytes_for_bucket = bytes;
}
}
impl Config {
pub fn get_core_config(&self) -> fcore::config::Config {
self.inner.clone()
}
}