sidebar_position: 2

Configuration

Connection Setup

import fluss

config = fluss.Config({"bootstrap.servers": "127.0.0.1:9123"})
conn = await fluss.FlussConnection.create(config)

The connection also supports context managers:

with await fluss.FlussConnection.create(config) as conn:
    ...

Connection Configurations

KeyDescriptionDefault
bootstrap.serversCoordinator server address127.0.0.1:9123
writer.request-max-sizeMaximum request size in bytes10485760 (10 MB)
writer.acksAcknowledgment setting (all waits for all replicas)all
writer.retriesNumber of retries on failure2147483647
writer.batch-sizeBatch size for writes in bytes2097152 (2 MB)
writer.batch-timeout-msThe maximum time to wait for a writer batch to fill up before sending.100
writer.bucket.no-key-assignerBucket assignment strategy for tables without bucket keys: sticky or round_robinsticky
scanner.remote-log.prefetch-numNumber of remote log segments to prefetch4
remote-file.download-thread-numNumber of threads for remote log downloads3
scanner.remote-log.read-concurrencyStreaming read concurrency within a remote log file4
scanner.log.max-poll-recordsMax records returned in a single poll()500
connect-timeoutTCP connect timeout in milliseconds120000
security.protocolPLAINTEXT (default) or sasl for SASL authPLAINTEXT
security.sasl.mechanismSASL mechanism (only PLAIN is supported)PLAIN
security.sasl.usernameSASL username (required when protocol is sasl)(empty)
security.sasl.passwordSASL password (required when protocol is sasl)(empty)

SASL Authentication

To connect to a Fluss cluster with SASL/PLAIN authentication enabled:

config = fluss.Config({
    "bootstrap.servers": "127.0.0.1:9123",
    "security.protocol": "sasl",
    "security.sasl.mechanism": "PLAIN",
    "security.sasl.username": "admin",
    "security.sasl.password": "admin-secret",
})
conn = await fluss.FlussConnection.create(config)

Connection Lifecycle

Remember to close the connection when done:

conn.close()