import ChangeLog from ‘../changelog/connector-hive.md’;
Hive 源连接器
从 Hive 读取数据。
使用 markdown 格式时,SeaTunnel 可以解析存储在 Hive 表中的 markdown 文件并提取结构化数据,包括标题、段落、列表、代码块和表格等元素。每个元素都转换为具有以下架构的行:
element_id
:元素的唯一标识符element_type
:元素类型(Heading、Paragraph、ListItem 等)heading_level
:标题级别(1-6,非标题元素为 null)text
:元素的文本内容page_number
:页码(默认:1)position_index
:文档中的位置索引parent_id
:父元素的 IDchild_ids
:子元素 ID 的逗号分隔列表注意:Markdown 格式仅支持读取,不支持写入。
:::tip 提示
为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hive。测试过的 Hive 版本是 2.3.9 和 3.1.3。
如果您使用 SeaTunnel 引擎,您需要将 seatunnel-hadoop3-3.1.4-uber.jar
、hive-exec-3.1.3.jar
和 libfb303-0.9.3.jar
放在 $SEATUNNEL_HOME/lib/
目录中。 :::
在 pollNext
调用中读取分片中的所有数据。读取的分片将保存在快照中。
名称 | 类型 | 必需 | 默认值 |
---|---|---|---|
table_name | string | 是 | - |
metastore_uri | string | 是 | - |
krb5_path | string | 否 | /etc/krb5.conf |
kerberos_principal | string | 否 | - |
kerberos_keytab_path | string | 否 | - |
hdfs_site_path | string | 否 | - |
hive_site_path | string | 否 | - |
hive.hadoop.conf | Map | 否 | - |
hive.hadoop.conf-path | string | 否 | - |
read_partitions | list | 否 | - |
read_columns | list | 否 | - |
compress_codec | string | 否 | none |
common-options | 否 | - |
目标 Hive 表名,例如:db1.table1
Hive 元存储 URI
hdfs-site.xml
的路径,用于加载 Namenode 的高可用配置
Hadoop 配置中的属性(core-site.xml
、hdfs-site.xml
、hive-site.xml
)
指定加载 core-site.xml
、hdfs-site.xml
、hive-site.xml
文件的路径
用户希望从 Hive 表中读取的目标分区,如果用户未设置此参数,将读取 Hive 表中的所有数据。
提示:分区列表中的每个分区应具有相同的目录层级。例如,一个 Hive 表有两个分区:par1
和 par2
,如果用户设置如下: read_partitions = [par1=xxx, par1=yyy/par2=zzz]
,这是不合法的
krb5.conf
的路径,用于 Kerberos 认证
Kerberos 认证的主体
Kerberos 认证的 keytab 文件路径
数据源的读取列列表,用户可以使用它来实现字段投影。
文件的压缩编解码器,支持的详细信息如下所示:
lzo
none
lzo
none
lzo
none
源插件的通用参数,请参阅 Source Common Options 了解详细信息。
Hive { table_name = "default.seatunnel_orc" metastore_uri = "thrift://namenode001:9083" }
注意:Hive 是结构化数据源,应使用
table_list
,tables_configs
将在未来移除。
Hive { table_list = [ { table_name = "default.seatunnel_orc_1" metastore_uri = "thrift://namenode001:9083" }, { table_name = "default.seatunnel_orc_2" metastore_uri = "thrift://namenode001:9083" } ] }
Hive { tables_configs = [ { table_name = "default.seatunnel_orc_1" metastore_uri = "thrift://namenode001:9083" }, { table_name = "default.seatunnel_orc_2" metastore_uri = "thrift://namenode001:9083" } ] }
source { Hive { table_name = "default.test_hive_sink_on_hdfs_with_kerberos" metastore_uri = "thrift://metastore:9083" hive.hadoop.conf-path = "/tmp/hadoop" plugin_output = hive_source hive_site_path = "/tmp/hive-site.xml" kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM" kerberos_keytab_path = "/tmp/hive.keytab" krb5_path = "/tmp/krb5.conf" } }
描述:
hive_site_path
:hive-site.xml
文件的路径。kerberos_principal
:Kerberos 认证的主体。kerberos_keytab_path
:Kerberos 认证的 keytab 文件路径。krb5_path
:用于 Kerberos 认证的 krb5.conf
文件路径。运行案例:
env { parallelism = 1 job.mode = "BATCH" } source { Hive { table_name = "default.test_hive_sink_on_hdfs_with_kerberos" metastore_uri = "thrift://metastore:9083" hive.hadoop.conf-path = "/tmp/hadoop" plugin_output = hive_source hive_site_path = "/tmp/hive-site.xml" kerberos_principal = "hive/metastore.seatunnel@EXAMPLE.COM" kerberos_keytab_path = "/tmp/hive.keytab" krb5_path = "/tmp/krb5.conf" } } sink { Assert { plugin_input = hive_source rules { row_rules = [ { rule_type = MAX_ROW rule_value = 3 } ], field_rules = [ { field_name = pk_id field_type = bigint field_value = [ { rule_type = NOT_NULL } ] }, { field_name = name field_type = string field_value = [ { rule_type = NOT_NULL } ] }, { field_name = score field_type = int field_value = [ { rule_type = NOT_NULL } ] } ] } } }
为 EMR 的 Hive 创建 lib 目录。
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
从 Maven 中心获取 jar 文件到 lib。
cd ${SEATUNNEL_HOME}/plugins/Hive/lib wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.6.5/hadoop-aws-2.6.5.jar wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
从您的 EMR 环境中复制 jar 文件到 lib 目录。
cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.60.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib cp /usr/share/aws/emr/hadoop-state-pusher/lib/hadoop-common-3.3.6-amzn-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib cp /usr/share/aws/emr/hadoop-state-pusher/lib/javax.inject-1.jar ${SEATUNNEL_HOME}/plugins/Hive/lib cp /usr/share/aws/emr/hadoop-state-pusher/lib/aopalliance-1.0.jar ${SEATUNNEL_HOME}/plugins/Hive/lib
运行案例。
env { parallelism = 1 job.mode = "BATCH" } source { Hive { table_name = "test_hive.test_hive_sink_on_s3" metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083" hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" hive.hadoop.conf = { bucket="s3://ws-package" fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" } read_columns = ["pk_id", "name", "score"] } } sink { Hive { table_name = "test_hive.test_hive_sink_on_s3_sink" metastore_uri = "thrift://ip-192-168-0-202.cn-north-1.compute.internal:9083" hive.hadoop.conf-path = "/home/ec2-user/hadoop-conf" hive.hadoop.conf = { bucket="s3://ws-package" fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider" } } }
为 EMR 的 Hive 创建 lib 目录。
mkdir -p ${SEATUNNEL_HOME}/plugins/Hive/lib
从 Maven 中心获取 jar 文件到 lib。
cd ${SEATUNNEL_HOME}/plugins/Hive/lib wget https://repo1.maven.org/maven2/org/apache/hive/hive-exec/2.3.9/hive-exec-2.3.9.jar
从您的 EMR 环境中复制 jar 文件到 lib 目录并删除冲突的 jar。
cp -r /opt/apps/JINDOSDK/jindosdk-current/lib/jindo-*.jar ${SEATUNNEL_HOME}/plugins/Hive/lib rm -f ${SEATUNNEL_HOME}/lib/hadoop-aliyun-*.jar
运行案例。
env { parallelism = 1 job.mode = "BATCH" } source { Hive { table_name = "test_hive.test_hive_sink_on_oss" metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083" hive.hadoop.conf-path = "/tmp/hadoop" hive.hadoop.conf = { bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com" } } } sink { Hive { table_name = "test_hive.test_hive_sink_on_oss_sink" metastore_uri = "thrift://master-1-1.c-1009b01725b501f2.cn-wulanchabu.emr.aliyuncs.com:9083" hive.hadoop.conf-path = "/tmp/hadoop" hive.hadoop.conf = { bucket="oss://emr-osshdfs.cn-wulanchabu.oss-dls.aliyuncs.com" } } }