Kudu

Kudu sink connector

Support Kudu Version

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Data Type Mapping

SeaTunnel Data TypeKudu Data Type
BOOLEANBOOL
INTINT8
INT16
INT32
BIGINTINT64
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
STRINGSTRING
TIMESTAMPUNIXTIME_MICROS
BYTESBINARY

Sink Options

NameTypeRequiredDefaultDescription
kudu_mastersStringYes-Kudu master address. Separated by ‘,’,such as ‘192.168.88.110:7051’.
table_nameStringYes-The name of kudu table.
client_worker_countIntNo2 * Runtime.getRuntime().availableProcessors()Kudu worker count. Default value is twice the current number of cpu cores.
client_default_operation_timeout_msLongNo30000Kudu normal operation time out.
client_default_admin_operation_timeout_msLongNo30000Kudu admin operation time out.
enable_kerberosBoolNofalseKerberos principal enable.
kerberos_principalStringNo-Kerberos principal. Note that all zeta nodes require have this file.
kerberos_keytabStringNo-Kerberos keytab. Note that all zeta nodes require have this file.
kerberos_krb5confStringNo-Kerberos krb5 conf. Note that all zeta nodes require have this file.
save_modeStringNo-Storage mode, support overwrite and append.
session_flush_modeStringNoAUTO_FLUSH_SYNCKudu flush mode. Default AUTO_FLUSH_SYNC.
batch_sizeIntNo1024The flush max size (includes all append, upsert and delete records), over this number of records, will flush data. The default value is 100
buffer_flush_intervalIntNo10000The flush interval mills, over this time, asynchronous threads will flush data.
ignore_not_foundBoolNofalseIf true, ignore all not found rows.
ignore_not_duplicateBoolNofalseIf true, ignore all dulicate rows.
common-optionsNo-Source plugin common parameters, please refer to Source Common Options for details.

Task Example

Simple:

The following example refers to a FakeSource named “kudu” cdc write kudu table “kudu_sink_table”


env { parallelism = 1 job.mode = "BATCH" } source { FakeSource { result_table_name = "kudu" schema = { fields { id = int val_bool = boolean val_int8 = tinyint val_int16 = smallint val_int32 = int val_int64 = bigint val_float = float val_double = double val_decimal = "decimal(16, 1)" val_string = string val_unixtime_micros = timestamp } } rows = [ { kind = INSERT fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] }, { kind = INSERT fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] }, { kind = INSERT fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] }, { kind = UPDATE_BEFORE fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] }, { kind = UPDATE_AFTER fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] }, { kind = DELETE fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"] } ] } } sink { kudu{ source_table_name = "kudu" kudu_masters = "kudu-master-cdc:7051" table_name = "kudu_sink_table" enable_kerberos = true kerberos_principal = "xx@xx.COM" kerberos_keytab = "xx.keytab" } }

Multiple Table

env {
  # You can set engine configuration here
  parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "kudu_sink_1"
         fields {
                id = int
                val_bool = boolean
                val_int8 = tinyint
                val_int16 = smallint
                val_int32 = int
                val_int64 = bigint
                val_float = float
                val_double = double
                val_decimal = "decimal(16, 1)"
                val_string = string
                val_unixtime_micros = timestamp
      }
        }
            rows = [
              {
                kind = INSERT
                fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
              }
              ]
       },
       {
       schema = {
         table = "kudu_sink_2"
              fields {
                        id = int
                        val_bool = boolean
                        val_int8 = tinyint
                        val_int16 = smallint
                        val_int32 = int
                        val_int64 = bigint
                        val_float = float
                        val_double = double
                        val_decimal = "decimal(16, 1)"
                        val_string = string
                        val_unixtime_micros = timestamp
              }
       }
           rows = [
             {
               kind = INSERT
               fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
             }
             ]
      }
    ]
  }
}


sink {
   kudu{
    kudu_masters = "kudu-master-multiple:7051"
 }
}

Changelog

2.2.0-beta 2022-09-26

  • Add Kudu Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Kudu Sink Connector Support to upsert row (2881)

Next Version

  • Change plugin name from KuduSink to Kudu 3432