| --- |
| title: "Streaming" |
| weight: 2 |
| type: docs |
| aliases: |
| - /append-table/streaming.html |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Streaming |
| |
| You can stream write to the Append table in a very flexible way through Flink, or read the Append table through |
| Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost |
| and the ability to push down filters and projection. |
| |
| ## Pre small files merging |
| |
| "Pre" means that this compact occurs before committing files to the snapshot. |
| |
| If Flink's checkpoint interval is short (for example, 30 seconds), each snapshot may produce lots of small changelog |
| files. Too many files may put a burden on the distributed storage cluster. |
| |
| In order to compact small changelog files into large ones, you can set the table option `precommit-compact = true`. |
| Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer |
| operator, which copies changelog files into large ones. |
| |
| ## Post small files merging |
| |
| "Post" means that this compact occurs after committing files to the snapshot. |
| |
| In streaming write job, without bucket definition, there is no compaction in writer, instead, will use |
| `Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you |
| run insert sql in flink, the topology will be like this: |
| |
| {{< img src="/img/unaware-bucket-topo.png">}} |
| |
| Do not worry about backpressure, compaction never backpressure. |
| |
| If you set `write-only` to true, the `Compact Coordinator` and `Compact Worker` will be removed in the topology. |
| |
| The auto compaction is only supported in Flink engine streaming mode. You can also start a compaction job in Flink by |
| Flink action in Paimon and disable all the other compactions by setting `write-only`. |
| |
| The following options control the strategy of compaction: |
| |
| <table class="configuration table table-bordered"> |
| <thead> |
| <tr> |
| <th class="text-left" style="width: 20%">Key</th> |
| <th class="text-left" style="width: 15%">Default</th> |
| <th class="text-left" style="width: 10%">Type</th> |
| <th class="text-left" style="width: 55%">Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><h5>write-only</h5></td> |
| <td style="word-wrap: break-word;">false</td> |
| <td>Boolean</td> |
| <td>If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.</td> |
| </tr> |
| <tr> |
| <td><h5>compaction.min.file-num</h5></td> |
| <td style="word-wrap: break-word;">5</td> |
| <td>Integer</td> |
| <td>For file set [f_0,...,f_N], the minimum file number to trigger a compaction for append table.</td> |
| </tr> |
| <tr> |
| <td><h5>compaction.delete-ratio-threshold</h5></td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>Double</td> |
| <td>Ratio of the deleted rows in a data file to be forced compacted.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| ## Streaming Query |
| |
| You can stream the Append table and use it like a Message Queue. As with primary key tables, there are two options |
| for streaming reads: |
| 1. By default, Streaming read produces the latest snapshot on the table upon first startup, and continue to read the |
| latest incremental records. |
| 2. You can specify `scan.mode`, `scan.snapshot-id`, `scan.timestamp-millis` and/or `scan.file-creation-time-millis` to |
| stream read incremental only. |
| |
| Similar to flink-kafka, order is not guaranteed by default, if your data has some sort of order requirement, you also |
| need to consider defining a `bucket-key`, see [Bucketed Append]({{< ref "append-table/bucketed" >}}) |