title: “SQL Sources & Sinks” nav-parent_id: tableapi nav-pos: 35

Flink SQL provides access to data which is stored in external systems (database, key-value store, message queue) or files.

By CREATE TABLE statement, data could be accessed as a SQL table in the following DML statements and translated to TableSource or TableSink automatically.

We use WITH clauses to describe the information necessary to access a external system.

  • This will be replaced by the TOC {:toc}

Provided Connectors

CSV connector

Support Matrix

{% highlight sql %}

-- Create a table named Orders which includes a primary key, and is stored as a CSV file CREATE TABLE Orders ( orderId BIGINT NOT NULL, customId VARCHAR NOT NULL, itemId BIGINT NOT NULL, totalPrice BIGINT NOT NULL, orderTime TIMESTAMP NOT NULL, description VARCHAR, PRIMARY KEY(orderId) ) WITH ( type=‘csv’, path=‘file:///abc/csv_file1’ )

{% endhighlight %}

Required configuration

  • type : use CSV to create a Csv Table to read CSV files or to write into CSV files.
  • path : locations of the CSV files. Accepts standard Hadoop globbing expressions. To read a directory of CSV files, specify a directory.

Optional Configuration

  • enumerateNestedFiles : when set to true, reader descends the directory for csv files. By default true.
  • fieldDelim : the field delimiter. By default ,, but can be set to any character.
  • lineDelim : the line delimiter. By default \n, but can be set to any character.
  • charset : defaults to UTF-8, but can be set to other valid charset names.
  • override : when set to true the existing files are overwritten. By default false.
  • emptyColumnAsNull : when set to true, any empty column will be set as null. By default false.
  • quoteCharacter : by default no quote character, but can be set to any character.
  • firstLineAsHeader : when set to true, the first line of files are used to name columns and are not included in data. All types are assumed to be string. By default false.
  • parallelism : the number of files to write to.
  • timeZone : timeZone to parse DateTime columns. Defaults to UTC, but can be set to other valid time zones.
  • commentsPrefix : skip lines beginning with this character. By default no commentsPrefix, but can be set to any string.
  • updateMode : the ways to encode a changes of a dynamic table. By default append. [See Table to Stream Conversion]({{ site.baseurl }}/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion)
    • append : encoding INSERT changes.
    • upsert : encoding INSERT and UPDATE changes as upsert message and DELETE changes as delete message.
    • retract : encoding INSERT as add message and DELETE changes as retract message, and an UPDATE change as a retract message for the updated (previous) row and an add message for the updating (new) row.

HBase Connector

Support Matrix

Legend:

  • Y: support
  • N: not support
  • I: incoming soon

{% highlight sql %} CREATE TABLE testSinkTable ( ROWKEY BIGINT, family1.col1 VARCHAR, family2.col1 INTEGER, family2.col2 VARCHAR, family3.col1 DOUBLE, family3.col2 DATE, family3.col3 BIGINT, PRIMARY KEY(ROWKEY) ) WITH ( type=‘HBASE’, connector.property-version=‘1.4.3’, hbase.zookeeper.quorum=‘test_hostname:2181’ )

{% endhighlight %}

Note : the HBase table schema (that used for writing or temporal joining) must have a single column primary key which named ROWKEY and the column name format should be columnFamily.qualifier.

Required Configuration

  • type : use HBASE to create an HBase table to read/write data.
  • connector.property-version : specify the HBase client version, currently only ‘1.4.3’ is available. More version(s) will'be supported later.
  • tableName : specify the name of the table in HBase.
  • hbase.zookeeper.quorum : specify the ZooKeeper quorum configuration for accessing the HBase cluster. Note : please specify this parameter or ensure a default hbase-site.xml is valid in the current classpath.

Optional Configuration

  • hbase.* : support all the parameters that have the ‘hbase.’ prefix, e.g., ‘hbase.client.operation.timeout’.

Kafka Connector

Support Matrix

Legend:

  • Y: support
  • N: not support
  • I: incoming soon

Create Source Tables

{% highlight sql %} CREATE TABLE kafka_source ( key VARBINARY, msg VARBINARY, topic VARCHAR, partition INT, offset BIGINT ) WITH ( type = ‘KAFKA010’, bootstrap.servers = ‘test_hostname:9092’, group.id = ‘test-group-id’, topic = ‘source-topic’, startupMode = ‘EARLIEST’ ) {% endhighlight %}

Note: At this point, the Kafka source table must be created with the above five columns.

Kafka Source Table Configurations in WITH block
Additional Kafka Source Table Configurations

When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka consumer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka consumers.

KAFKA09

KAFKA010

KAFKA011

Create Sink Tables

{% highlight sql %} CREATE TABLE kafka_sink ( messageKey VARBINARY, messageValue VARBINARY, PRIMARY KEY (messageKey)) with ( type = ‘KAFKA010’, topic = ‘sink-topic’, bootstrap.servers = ‘test_hostname:9092’, retries = ‘3’ ) {% endhighlight %}

Note: The primary key is mandatory for Kafka table as a sink table.

Kafka Sink Table Configurations in WITH block
Additional Sink Table Configurations

When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka producer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka producers.

KAFKA09

KAFKA010

KAFKA011

PARQUET Connector

Support Matrix

Legend:

  • Y: support
  • N: not support
  • I: incoming soon

{% highlight sql %} CREATE TABLE testSinkTable ( family1.col1 VARCHAR, family2.col1 INTEGER, family2.col2 VARCHAR ) WITH ( type=‘PARQUET’, filePath=‘schema://file1/file2.csv’ )

{% endhighlight %}

Required Configuration

  • type : use PARQUET declare this data source is a parquet format.
  • filePath : the path to write the data to or consume from.

Optional Configuration

  • enumerateNestedFiles : If to read all the data files from filePath recursively, default to be true. This only works for table source.
  • writeMode : If to override the file if there is already a file same name to the path to write to. Default to be no_overwrite, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.
  • compressionCodecName: The compression codec of the parquet format, the options are uncompressed/snappy/gzip/lzo and default to be snappy. This only works for table sink.

ORC Connector

Support Matrix

Legend:

  • Y: support
  • N: not support
  • I: incoming soon

{% highlight sql %} CREATE TABLE testSinkTable ( family1.col1 VARCHAR, family2.col1 INTEGER, family2.col2 VARCHAR, primary key(family1.col1) ) WITH ( type=‘ORC’, filePath=‘schema://file1/file2.csv’ )

{% endhighlight %}

Required Configuration

  • type : use ORC declare this data source is a ORC format.
  • filePath : the path to write the data to or consume from.

Optional Configuration

  • enumerateNestedFiles : If to read all the data files from filePath recursively, default to be true. This only works for table source.
  • writeMode : If to override the file if there is already a file same name to the path to write to. Default to be no_overwrite, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.
  • compressionCodecName: The compression codec of the orc format, the options are uncompressed/snappy/gzip/lzo and default to be snappy. This only works for table sink.