blob: 49f44a7f13a773053c33705019afc828d507b3eb [file] [log] [blame] [view]
---
title: "Streaming"
weight: 2
type: docs
aliases:
- /append-table/streaming.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Streaming
You can stream write to the Append table in a very flexible way through Flink, or read the Append table through
Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost
and the ability to push down filters and projection.
## Pre small files merging
"Pre" means that this compact occurs before committing files to the snapshot.
If Flink's checkpoint interval is short (for example, 30 seconds), each snapshot may produce lots of small changelog
files. Too many files may put a burden on the distributed storage cluster.
In order to compact small changelog files into large ones, you can set the table option `precommit-compact = true`.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer
operator, which copies changelog files into large ones.
## Post small files merging
"Post" means that this compact occurs after committing files to the snapshot.
In streaming write job, without bucket definition, there is no compaction in writer, instead, will use
`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you
run insert sql in flink, the topology will be like this:
{{< img src="/img/unaware-bucket-topo.png">}}
Do not worry about backpressure, compaction never backpressure.
If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology.
The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in Flink by
Flink action in Paimon and disable all the other compactions by setting `write-only`.
The following options control the strategy of compaction:
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>write-only</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td>
</tr>
<tr>
<td><h5>compaction.min.file-num</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>For file set [f_0,...,f_N], the minimum file number to trigger a compaction for append table.</td>
</tr>
<tr>
<td><h5>compaction.delete-ratio-threshold</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Double</td>
<td>Ratio of the deleted rows in a data file to be forced compacted.</td>
</tr>
</tbody>
</table>
## Streaming Query
You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options
for streaming reads:
1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the
latest incremental records.
2. You can specify `scan.mode`, `scan.snapshot-id`, `scan.timestamp-millis` and/or `scan.file-creation-time-millis` to
stream read incremental only.
Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also
need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref "append-table/bucketed" >}})