import ChangeLog from ‘../changelog/connector-doris.md’;

Doris

Doris source connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Description

Used to read data from Apache Doris.

Using Dependency

For Spark/Flink Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Supported DataSource Info

DatasourceSupported versionsDriverUrlMaven
DorisOnly Doris2.0 or later is supported.---

Data Type Mapping

Doris Data typeSeaTunnel Data type
INTINT
TINYINTTINYINT
SMALLINTSMALLINT
BIGINTBIGINT
LARGEINTSTRING
BOOLEANBOOLEAN
DECIMALDECIMAL((Get the designated column‘s specified column size)+1,
(Gets the designated column’s number of digits to right of the decimal point.)))
FLOATFLOAT
DOUBLEDOUBLE
CHAR
VARCHAR
STRING
TEXT
STRING
DATEDATE
DATETIME
DATETIME(p)
TIMESTAMP
ARRAYARRAY

Source Options

Base configuration:

NameTypeRequiredDefaultDescription
fenodesstringyes-FE address, the format is "fe_host:fe_http_port"
usernamestringyes-User username
passwordstringyes-User password
doris.request.retriesintno3Number of retries to send requests to Doris FE.
doris.request.read.timeout.msintno30000
doris.request.connect.timeout.msintno30000
query-portstringno9030Doris QueryPort
doris.request.query.timeout.sintno3600Timeout period of Doris scan data, expressed in seconds.
table_liststring-table list

Table list configuration:

NameTypeRequiredDefaultDescription
databasestringyes-The name of Doris database
tablestringyes-The name of Doris table
doris.read.fieldstringno-Use the ‘doris.read.field’ parameter to select the doris table columns to read
doris.filter.querystringno-Data filtering in doris. the format is “field = value”,example : doris.filter.query = “F_ID > 2”
doris.batch.sizeintno1024The maximum value that can be obtained by reading Doris BE once.
doris.exec.mem.limitlongno2147483648Maximum memory that can be used by a single be scan request. The default memory is 2G (2147483648).

Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.

Tips

It is not recommended to modify advanced parameters at will

Example

single table

This is an example of reading a Doris table and writing to Console.

env {
  parallelism = 2
  job.mode = "BATCH"
}
source{
  Doris {
      fenodes = "doris_e2e:8030"
      username = root
      password = ""
      database = "e2e_source"
      table = "doris_e2e_table"
  }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
    Console {}
}

Use the ‘doris.read.field’ parameter to select the doris table columns to read

env {
  parallelism = 2
  job.mode = "BATCH"
}
source{
  Doris {
      fenodes = "doris_e2e:8030"
      username = root
      password = ""
      database = "e2e_source"
      table = "doris_e2e_table"
      doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT"
  }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
    Console {}
}

Use ‘doris.filter.query’ to filter the data, and the parameter values are passed directly to doris

env {
  parallelism = 2
  job.mode = "BATCH"
}
source{
  Doris {
      fenodes = "doris_e2e:8030"
      username = root
      password = ""
      database = "e2e_source"
      table = "doris_e2e_table"
      doris.filter.query = "F_ID > 2"
  }
}

transform {
    # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
    # please go to https://seatunnel.apache.org/docs/transform/sql
}

sink {
    Console {}
}

Multiple table

env{
  parallelism = 1
  job.mode = "BATCH"
}

source{
  Doris {
      fenodes = "xxxx:8030"
      username = root
      password = ""
      table_list = [
          {
            database = "st_source_0"
            table = "doris_table_0"
            doris.read.field = "F_ID,F_INT,F_BIGINT,F_TINYINT"
            doris.filter.query = "F_ID >= 50"
          },
          {
            database = "st_source_1"
            table = "doris_table_1"
          }
      ]
  }
}

transform {}

sink{
  Doris {
      fenodes = "xxxx:8030"
      schema_save_mode = "RECREATE_SCHEMA"
      username = root
      password = ""
      database = "st_sink"
      table = "${table_name}"
      sink.enable-2pc = "true"
      sink.label-prefix = "test_json"
      doris.config = {
          format="json"
          read_json_by_line="true"
      }
  }
}

Changelog