blob: 8de648e97d00f169095bca5caa77eebddcf3ff5d [file] [log] [blame] [view]
---
title: "Java Client"
sidebar_position: 1
---
# Fluss Java Client
## Overview
Fluss `Admin` API that supports asynchronous operations for managing and inspecting Fluss resources. It communicates with the Fluss cluster and provides methods for:
* Managing databases (create, drop, list)
* Managing tables (create, drop, list)
* Managing partitions (create, drop, list)
* Retrieving metadata (schemas, snapshots, server information)
Fluss `Table` API allows you to interact with Fluss tables for reading and writing data.
## Dependency
In order to use the client, you need to add the following dependency to your `pom.xml` file.
```xml
<!-- https://mvnrepository.com/artifact/org.apache.fluss/fluss-client -->
<dependency>
<groupId>org.apache.fluss</groupId>
<artifactId>fluss-client</artifactId>
<version>$FLUSS_VERSION$</version>
</dependency>
```
## Initialization
`Connection` is the main entry point for the Fluss Java client. It is used to create `Admin` and `Table` instances.
The `Connection` object is created using the `ConnectionFactory` class, which takes a `Configuration` object as an argument.
The `Configuration` object contains the necessary configuration parameters for connecting to the Fluss cluster, such as the bootstrap servers.
The `Connection` object is thread-safe and can be shared across multiple threads. It is recommended to create a
single `Connection` instance per application and use it to create multiple `Admin` and `Table` instances.
`Table` and `Admin` instances, on the other hand, are not thread-safe and should be created for each thread that needs to access them.
Caching or pooling of `Table` and `Admin` is not recommended.
Create a new `Admin` instance :
```java
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "localhost:9123");
Connection connection = ConnectionFactory.createConnection(conf);
// obtain Admin instance from the Connection
Admin admin = connection.getAdmin();
admin.listDatabases().get().forEach(System.out::println);
// obtain Table instance from the Connection
Table table = connection.getTable(TablePath.of("my_db", "my_table"));
System.out.println(table.getTableInfo());
```
if you are using SASL authentication, you need to set the following properties:
```java
// creating Connection object to connect with Fluss cluster
Configuration conf = new Configuration();
conf.setString("bootstrap.servers", "localhost:9123");
conf.setString("client.security.protocol", "sasl");
conf.setString("client.security.sasl.mechanism", "PLAIN");
conf.setString("client.security.sasl.username", "alice");
conf.setString("client.security.sasl.password", "alice-secret");
Connection connection = ConnectionFactory.createConnection(conf);
// obtain Admin instance from the Connection
Admin admin = connection.getAdmin();
admin.listDatabases().get().forEach(System.out::println);
// obtain Table instance from the Connection
Table table = connection.getTable(TablePath.of("my_db", "my_table"));
System.out.println(table.getTableInfo());
```
## Working Operations
All methods in `FlussAdmin` return `CompletableFuture` objects. You can handle these in two ways:
### Blocking Operations
For synchronous behavior, use the `get()` method:
```java
// Blocking call
List<String> databases = admin.listDatabases().get();
```
### Asynchronous Operations
For non-blocking behavior, use the `thenAccept`, `thenApply`, or other methods:
```java
admin.listDatabases()
.thenAccept(databases -> {
System.out.println("Available databases:");
databases.forEach(System.out::println);
})
.exceptionally(ex -> {
System.err.println("Failed to list databases: " + ex.getMessage());
return null;
});
```
## Creating Databases and Tables
### Creating a Database
```java
// Create database descriptor
DatabaseDescriptor descriptor = DatabaseDescriptor.builder()
.comment("This is a test database")
.customProperty("owner", "data-team")
.build();
// Create database (true means ignore if exists)
admin.createDatabase("my_db", descriptor, true) // non-blocking call
.thenAccept(unused -> System.out.println("Database created successfully"))
.exceptionally(ex -> {
System.err.println("Failed to create database: " + ex.getMessage());
return null;
});
```
### Creating a Table
```java
Schema schema = Schema.newBuilder()
.column("id", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("created_at", DataTypes.TIMESTAMP())
.column("is_active", DataTypes.BOOLEAN())
.primaryKey("id")
.build();
// Use the schema in a table descriptor
TableDescriptor tableDescriptor = TableDescriptor.builder()
.schema(schema)
.distributedBy(1, "id") // Distribute by the id column with 1 buckets
// .partitionedBy("") // Partition by the partition key
.build();
TablePath tablePath = TablePath.of("my_db", "user_table");
admin.createTable(tablePath, tableDescriptor, false).get(); // blocking call
TableInfo tableInfo = admin.getTableInfo(tablePath).get(); // blocking call
System.out.println(tableInfo);
```
## Table API
### Writers
In order to write data to Fluss tables, first you need to create a Table instance.
```java
TablePath tablePath = TablePath.of("my_db", "user_table");
Table table = connection.getTable(tablePath);
```
In Fluss we have both Primary Key Tables and Log Tables, so the client provides different functionality depending on the table type.
You can use an `UpsertWriter` to write data to a Primary Key table, and an `AppendWriter` to write data to a Log Table.
````java
table.newUpsert().createWriter();
table.newAppend().createWriter();
````
Let's take a look at how to write data to a Primary Key table.
```java
List<User> users = List.of(
new User("1", 20, LocalDateTime.now() , true),
new User("2", 22, LocalDateTime.now() , true),
new User("3", 23, LocalDateTime.now() , true),
new User("4", 24, LocalDateTime.now() , true),
new User("5", 25, LocalDateTime.now() , true)
);
```
**Note:** Currently data in Fluss is written in the form of `rows`, so we need to convert our POJO to `GenericRow`, while the Fluss community is working to provide
a more user-friendly API for writing data.
```java
Table table = connection.getTable(tablePath);
List<GenericRow> rows = users.stream().map(user -> {
GenericRow row = new GenericRow(4);
row.setField(0, BinaryString.fromString(user.getId()));
row.setField(1, user.getAge());
row.setField(2, TimestampNtz.fromLocalDateTime(user.getCreatedAt()));
row.setField(3, user.isActive());
return row;
}).collect(Collectors.toList());
System.out.println("Upserting rows to the table");
UpsertWriter writer = table.newUpsert().createWriter();
// upsert() is a non-blocking call that sends data to Fluss server with batching and timeout
rows.forEach(writer::upsert);
// call flush() to blocking the thread until all data is written successfully
writer.flush();
```
For a Log table you can use the `AppendWriter` API to write data.
```java
table.newAppend().createWriter().append(row);
```
### Scanner
In order to read data from Fluss tables, first you need to create a Scanner instance. Then users can subscribe to the table buckets and
start polling for records.
```java
LogScanner logScanner = table.newScan()
.createLogScanner();
int numBuckets = table.getTableInfo().getNumBuckets();
System.out.println("Number of buckets: " + numBuckets);
for (int i = 0; i < numBuckets; i++) {
System.out.println("Subscribing to bucket " + i);
logScanner.subscribeFromBeginning(i);
}
long scanned = 0;
Map<Integer, List<String>> rowsMap = new HashMap<>();
while (true) {
System.out.println("Polling for records...");
ScanRecords scanRecords = logScanner.poll(Duration.ofSeconds(1));
for (TableBucket bucket : scanRecords.buckets()) {
for (ScanRecord record : scanRecords.records(bucket)) {
InternalRow row = record.getRow();
// Process the row
...
}
}
scanned += scanRecords.count();
}
```
### Lookup
You can also use the Fluss API to perform lookups on a table. This is useful for querying specific records based on their primary key or prefix key.
```java
// Lookup by primary key
LookupResult lookup = table.newLookup()
.createLookuper()
.lookup(rowKey)
.get();
// Lookup by prefix key
LookupResult prefixLookup = table.newLookup()
.lookupBy(prefixKeys)
.createLookuper()
.lookup(rowKey)
.get();
```