import ChangeLog from ‘../changelog/connector-hbase.md’;

Hbase

Hbase 数据连接器

描述

将数据输出到hbase

主要特性

选项

名称类型是否必须默认值
zookeeper_quorumstringyes-
tablestringyes-
rowkey_columnlistyes-
family_nameconfigyes-
rowkey_delimiterstringno""
version_columnstringno-
null_modestringnoskip
wal_writebooleanyesfalse
write_buffer_sizestringno8 * 1024 * 1024
encodingstringnoutf8
hbase_extra_configstringno-
common-optionsno-
ttllongno-

zookeeper_quorum [string]

hbase的zookeeper集群主机, 示例: “hadoop001:2181,hadoop002:2181,hadoop003:2181”

table [string]

要写入的表名, 例如: “seatunnel”

rowkey_column [list]

行键的列名列表, 例如: [“id”, “uuid”]

family_name [config]

字段的列簇名称映射。例如,上游的行如下所示:

idnameage
1tyrantlucifer27

id作为行键和其他写入不同列簇的字段,可以分配

family_name { name = “info1” age = “info2” }

这主要是name写入列簇info1,age写入将写给列簇 info2

如果要将其他字段写入同一列簇,可以分配

family_name { all_columns = “info” }

这意味着所有字段都将写入该列簇 info

rowkey_delimiter [string]

连接多行键的分隔符,默认 ""

version_column [string]

版本列名称,您可以使用它来分配 hbase 记录的时间戳

null_mode [double]

写入 null 值的模式,支持 [ skip , empty], 默认 skip

  • skip: 当字段为 null ,连接器不会将此字段写入 hbase
  • empty: 当字段为null时,连接器将写入并为此字段生成空值

wal_write [boolean]

wal log 写入标志,默认值 false

write_buffer_size [int]

hbase 客户端的写入缓冲区大小,默认 8 * 1024 * 1024

encoding [string]

字符串字段的编码,支持[ utf8 , gbk],默认 utf8

hbase_extra_config [config]

hbase扩展配置

ttl [long]

hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒

常见选项

Sink 插件常用参数,详见 Sink 常用选项 Sink Common Options

案例


Hbase { zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181" table = "seatunnel_test" rowkey_column = ["name"] family_name { all_columns = seatunnel } }

写入多表

env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
}

source {
  FakeSource {
    tables_configs = [
       {
        schema = {
          table = "hbase_sink_1"
         fields {
                    name = STRING
                    c_string = STRING
                    c_double = DOUBLE
                    c_bigint = BIGINT
                    c_float = FLOAT
                    c_int = INT
                    c_smallint = SMALLINT
                    c_boolean = BOOLEAN
                    time = BIGINT
           }
        }
            rows = [
              {
                kind = INSERT
                fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
              }
              ]
       },
       {
       schema = {
         table = "hbase_sink_2"
              fields {
                    name = STRING
                    c_string = STRING
                    c_double = DOUBLE
                    c_bigint = BIGINT
                    c_float = FLOAT
                    c_int = INT
                    c_smallint = SMALLINT
                    c_boolean = BOOLEAN
                    time = BIGINT
              }
       }
           rows = [
             {
               kind = INSERT
               fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
             }
             ]
      }
    ]
  }
}

sink {
  Hbase {
    zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
    table = "${table_name}"
    rowkey_column = ["name"]
    family_name {
      all_columns = info
    }
  }
}

写入指定列族

Hbase {
  zookeeper_quorum = "hbase_e2e:2181"
  table = "assign_cf_table"
  rowkey_column = ["id"]
  family_name {
    c_double = "cf1"
    c_bigint = "cf2"
  }
}

变更日志