Flink SQL provides access to data which is stored in external systems (database, key-value store, message queue) or files.
By CREATE TABLE statement, data could be accessed as a SQL table in the following DML statements and translated to TableSource
or TableSink
automatically.
We use WITH clauses to describe the information necessary to access a external system.
{% highlight sql %}
-- Create a table named Orders
which includes a primary key, and is stored as a CSV file CREATE TABLE Orders ( orderId BIGINT NOT NULL, customId VARCHAR NOT NULL, itemId BIGINT NOT NULL, totalPrice BIGINT NOT NULL, orderTime TIMESTAMP NOT NULL, description VARCHAR, PRIMARY KEY(orderId) ) WITH ( type=‘csv’, path=‘file:///abc/csv_file1’ )
{% endhighlight %}
CSV
to create a Csv Table to read CSV files or to write into CSV files.true
, reader descends the directory for csv files. By default true
.,
, but can be set to any character.\n
, but can be set to any character.UTF-8
, but can be set to other valid charset names.true
the existing files are overwritten. By default false
.true
, any empty column will be set as null. By default false
.true
, the first line of files are used to name columns and are not included in data. All types are assumed to be string. By default false
.UTC
, but can be set to other valid time zones.append
. [See Table to Stream Conversion]({{ site.baseurl }}/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion)Legend:
{% highlight sql %} CREATE TABLE testSinkTable ( ROWKEY BIGINT, family1.col1
VARCHAR, family2.col1
INTEGER, family2.col2
VARCHAR, family3.col1
DOUBLE, family3.col2
DATE, family3.col3
BIGINT, PRIMARY KEY(ROWKEY) ) WITH ( type=‘HBASE’, connector.property-version=‘1.4.3’, hbase.zookeeper.quorum=‘test_hostname:2181’ )
{% endhighlight %}
Note : the HBase table schema (that used for writing or temporal joining) must have a single column primary key which named ROWKEY
and the column name format should be columnFamily.qualifier
.
HBASE
to create an HBase table to read/write data.hbase-site.xml
is valid in the current classpath.hbase.*
: support all the parameters that have the ‘hbase.’ prefix, e.g., ‘hbase.client.operation.timeout’.Legend:
{% highlight sql %} CREATE TABLE kafka_source ( key VARBINARY, msg VARBINARY, topic
VARCHAR, partition
INT, offset
BIGINT ) WITH ( type = ‘KAFKA010’, bootstrap.servers
= ‘test_hostname:9092’, group.id
= ‘test-group-id’, topic
= ‘source-topic’, startupMode = ‘EARLIEST’ ) {% endhighlight %}
Note: At this point, the Kafka source table must be created with the above five columns.
When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka consumer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka consumers.
{% highlight sql %} CREATE TABLE kafka_sink ( messageKey VARBINARY, messageValue VARBINARY, PRIMARY KEY (messageKey)) with ( type = ‘KAFKA010’, topic = ‘sink-topic’, bootstrap.servers
= ‘test_hostname:9092’, retries = ‘3’ ) {% endhighlight %}
Note: The primary key is mandatory for Kafka table as a sink table.
When defining a table from Kafka, in the with block, users can also set the configurations supported by Kafka producer from the corresponding Kafka version. See the following links for all the configurations supported by Kafka producers.
Legend:
{% highlight sql %} CREATE TABLE testSinkTable ( family1.col1
VARCHAR, family2.col1
INTEGER, family2.col2
VARCHAR ) WITH ( type=‘PARQUET’, filePath=‘schema://file1/file2.csv’ )
{% endhighlight %}
PARQUET
declare this data source is a parquet format.filePath
recursively, default to be true
. This only works for table source.no_overwrite
, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.uncompressed
/snappy
/gzip
/lzo
and default to be snappy
. This only works for table sink.Legend:
{% highlight sql %} CREATE TABLE testSinkTable ( family1.col1
VARCHAR, family2.col1
INTEGER, family2.col2
VARCHAR, primary key(family1.col1
) ) WITH ( type=‘ORC’, filePath=‘schema://file1/file2.csv’ )
{% endhighlight %}
ORC
declare this data source is a ORC format.filePath
recursively, default to be true
. This only works for table source.no_overwrite
, which means the file would not be overridden, so an error will thrown out if there exists same name files. This only works for table sink.uncompressed
/snappy
/gzip
/lzo
and default to be snappy
. This only works for table sink.