tree: b4cdf2ecfe7f1791772ce2ea4e176d7c042aee0f [path history] [tgz]
  1. src/
  2. pom.xml
  3. README.md
extension/spark-doris-connector/README.md

Spark-Doris-Connector

Fetures

  • 当前版本只支持从Doris中读取数据。
  • 可以将Doris表映射为DataFrame或者RDD,推荐使用DataFrame
  • 支持在Doris端完成数据过滤,减少数据传输量。

Version Compatibility

ConnectorSparkDorisJavaScala
1.0.02.xmaster82.11

Building

mvn clean package

编译成功后,会在target目录下生成文件doris-spark-1.0.0-SNAPSHOT.jar。将此文件复制到SparkClassPath中即可使用Spark-Doris-Connector。例如,Local模式运行的Spark,将此文件放入jars文件夹下。Yarn集群模式运行的Spark,则将此文件放入预部署包中。

QuickStart

SQL

CREATE TEMPORARY VIEW spark_doris
USING doris
OPTIONS(
  "table.identifier"="$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME",
  "fenodes"="$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
  "user"="$YOUR_DORIS_USERNAME",
  "password"="$YOUR_DORIS_PASSWORD"
);

SELECT * FROM spark_doris;

DataFrame

val dorisSparkDF = spark.read.format("doris")
  .option("doris.table.identifier", "$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME")
	.option("doris.fenodes", "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
  .option("user", "$YOUR_DORIS_USERNAME")
  .option("password", "$YOUR_DORIS_PASSWORD")
  .load()

dorisSparkDF.show(5)

RDD

import org.apache.doris.spark._
val dorisSparkRDD = sc.dorisRDD(
  tableIdentifier = Some("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME"),
  cfg = Some(Map(
    "doris.fenodes" -> "$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT",
    "doris.request.auth.user" -> "$YOUR_DORIS_USERNAME",
    "doris.request.auth.password" -> "$YOUR_DORIS_PASSWORD"
  ))
)

dorisSparkRDD.collect()

Configuration

General

KeyDefault ValueComment
doris.fenodes--Doris Restful接口地址,支持多个地址,使用逗号分隔
doris.table.identifier--DataFame/RDD对应的Doris表名
doris.request.retries3向Doris发送请求的重试次数
doris.request.connect.timeout.ms30000向Doris发送请求的连接超时时间
doris.request.read.timeout.ms30000向Doris发送请求的读取超时时间
doris.request.query.timeout.s3600查询doris的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.sizeInteger.MAX_VALUE一个RDD Partition对应的Doris Tablet个数。
此数值设置越小,则会生成越多的Partition。
从而提升Spark侧的并行度,但同时会对Doris造成更大的压力。
doris.batch.size1024一次从BE读取数据的最大行数。
增大此数值可减少Spark与Doris之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit2147483648单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.asyncfalse是否支持异步转换Arrow格式到spark-doris-connector迭代所需的RowBatch
doris.deserialize.queue.size64异步转换Arrow格式的内部处理队列,当doris.deserialize.arrow.async为true时生效

SQL and Dataframe Only

KeyDefault ValueComment
user--访问Doris的用户名
password--访问Doris的密码
doris.filter.query.in.max.count100谓词下推中,in表达式value列表元素最大数量。
超过此数量,则in表达式条件过滤在Spark侧处理。

RDD Only

KeyDefault ValueComment
doris.request.auth.user--访问Doris的用户名
doris.request.auth.password--访问Doris的密码
doris.read.field--读取Doris表的列名列表,多列之间使用逗号分隔
doris.filter.query--过滤读取数据的表达式,此表达式透传给Doris。
Doris使用此表达式完成源端数据过滤。

Doris Data Type - Spark Data Type Mapping

Doris TypeSpark Type
NULL_TYPEDataTypes.NullType
BOOLEANDataTypes.BooleanType
TINYINTDataTypes.ByteType
SMALLINTDataTypes.ShortType
INTDataTypes.IntegerType
BIGINTDataTypes.LongType
FLOATDataTypes.FloatType
DOUBLEDataTypes.DoubleType
DATEDataTypes.StringType1
DATETIMEDataTypes.StringType1
BINARYDataTypes.BinaryType
DECIMALDecimalType
CHARDataTypes.StringType
LARGEINTDataTypes.StringType
VARCHARDataTypes.StringType
DECIMALV2DecimalType
TIMEDataTypes.DoubleType
HLLUnsupported datatype

1: Connector中,将DATEDATETIME映射为String。由于Doris底层存储引擎处理逻辑,直接使用时间类型时,覆盖的时间范围无法满足需求。所以使用String类型直接返回对应的时间可读文本。