import ChangeLog from ‘../changelog/connector-kudu.md’;
Kudu source connector
Spark
Flink
SeaTunnel Zeta
Used to read data from Kudu.
The tested kudu version is 1.11.1.
kudu Data Type | SeaTunnel Data Type |
---|---|
BOOL | BOOLEAN |
INT8 INT16 INT32 | INT |
INT64 | BIGINT |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
STRING | STRING |
UNIXTIME_MICROS | TIMESTAMP |
BINARY | BYTES |
Name | Type | Required | Default | Description |
---|---|---|---|---|
kudu_masters | String | Yes | - | Kudu master address. Separated by ‘,’,such as ‘192.168.88.110:7051’. |
table_name | String | Yes | - | The name of kudu table. |
client_worker_count | Int | No | 2 * Runtime.getRuntime().availableProcessors() | Kudu worker count. Default value is twice the current number of cpu cores. |
client_default_operation_timeout_ms | Long | No | 30000 | Kudu normal operation time out. |
client_default_admin_operation_timeout_ms | Long | No | 30000 | Kudu admin operation time out. |
enable_kerberos | Bool | No | false | Kerberos principal enable. |
kerberos_principal | String | No | - | Kerberos principal. Note that all zeta nodes require have this file. |
kerberos_keytab | String | No | - | Kerberos keytab. Note that all zeta nodes require have this file. |
kerberos_krb5conf | String | No | - | Kerberos krb5 conf. Note that all zeta nodes require have this file. |
scan_token_query_timeout | Long | No | 30000 | The timeout for connecting scan token. If not set, it will be the same as operationTimeout. |
scan_token_batch_size_bytes | Int | No | 1024 * 1024 | Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB. |
filter | String | No | - | Kudu scan filter expressions,example id > 100 AND id < 200. |
schema | Map | No | 1024 * 1024 | SeaTunnel Schema. |
table_list | Array | No | - | The list of tables to be read. you can use this configuration instead of table_path example: table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] |
common-options | No | - | Source plugin common parameters, please refer to Source Common Options for details. |
The following example is for a Kudu table named “kudu_source_table”, The goal is to print the data from this table on the console and write kudu table “kudu_sink_table”
# Defining the runtime environment env { parallelism = 2 job.mode = "BATCH" } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** kudu { kudu_masters = "kudu-master:7051" table_name = "kudu_source_table" plugin_output = "kudu" enable_kerberos = true kerberos_principal = "xx@xx.COM" kerberos_keytab = "xx.keytab" } } transform { } sink { console { plugin_input = "kudu" } kudu { plugin_input = "kudu" kudu_masters = "kudu-master:7051" table_name = "kudu_sink_table" enable_kerberos = true kerberos_principal = "xx@xx.COM" kerberos_keytab = "xx.keytab" } }
env { # You can set engine configuration here parallelism = 1 job.mode = "STREAMING" checkpoint.interval = 5000 } source { # This is a example source plugin **only for test and demonstrate the feature source plugin** kudu{ kudu_masters = "kudu-master:7051" table_list = [ { table_name = "kudu_source_table_1" },{ table_name = "kudu_source_table_2" } ] plugin_output = "kudu" } } transform { } sink { Assert { rules { table-names = ["kudu_source_table_1", "kudu_source_table_2"] } } }