title: “Write Performance” weight: 3 type: docs aliases:
Paimon's write performance is closely related to checkpoint, so if you need greater write throughput:
'flink-conf.yaml'/'config.yaml' or SET in SQL): Increase the checkpoint interval ('execution.checkpointing.interval'), increase max concurrent checkpoints to 3 ('execution.checkpointing.max-concurrent-checkpoints'), or just use batch mode.write-buffer-size.write-buffer-spillable.Option 'changelog-producer' = 'lookup' or 'full-compaction', and option 'full-compaction.delta-commits' have a large impact on write performance, if it is a snapshot / full synchronization phase you can unset these options and then enable them again in the incremental phase.
If you find that the input of the job shows a jagged pattern in the case of backpressure, it may be imbalanced work nodes. You can consider turning on [Asynchronous Compaction]({{< ref “primary-key-table/compaction#asynchronous-compaction” >}}) to observe if the throughput is increased.
It is recommended that the parallelism of sink should be less than or equal to the number of buckets, preferably equal. You can control the parallelism of the sink with the sink.parallelism table property.
If your job suffers from primary key data skew (for example, you want to count the number of views for each page in a website, and some particular pages are very popular among the users), you can set 'local-merge-buffer-size' so that input records will be buffered and merged before they're shuffled by bucket and written into sink. This is particularly useful when the same primary key is updated frequently between snapshots.
The buffer will be flushed when it is full. We recommend starting with 64 mb when you are faced with data skew but don't know where to start adjusting buffer size.
(Currently, Local merging not works for CDC ingestion)
If you want to achieve ultimate compaction performance, you can consider using row storage file format AVRO.
This a tradeoff.
Enable row storage through the following options:
file.format = avro metadata.stats-mode = none
The collection of statistical information for row storage is a bit expensive, so I suggest turning off statistical information as well.
If you don't want to modify all files to Avro format, at least you can consider modifying the files in the previous layers to Avro format. You can use 'file.format.per.level' = '0:avro,1:avro' to specify the files in the first two layers to be in Avro format.
By default, Paimon uses zstd with level 1, you can modify the compression algorithm:
'file.compression.zstd-level': Default zstd level is 1. For higher compression rates, it can be configured to 9, but the read and write speed will significantly decrease.
If there are too few buckets or resources, full-compaction may cause the checkpoint timeout, Flink's default checkpoint timeout is 10 minutes.
If you expect stability even in this case, you can turn up the checkpoint timeout, for example:
execution.checkpointing.timeout = 60 min
In the initialization of write, the writer of the bucket needs to read all historical files. If there is a bottleneck here (For example, writing a large number of partitions simultaneously), you can use sink.writer-coordinator.enabled to use a Flink coordinator to cache the read manifest data to accelerate initialization. The cache memory for coordinator is sink.writer-coordinator.cache-memory, default is 1GB in Job Manager.
There are three main places in Paimon writer that takes up memory:
write-buffer-size table property.num-sorted-run.compaction-trigger option to change the number of sorted runs to be merged.read.batch-size option can alleviate the impact of this case.orc.write.batch-size option can reduce the consumption of memory for ORC format.'parquet.enable.dictionary'= 'false'.orc.dictionary.key.threshold='0'. Additionally,set orc.column.encoding.direct='field1,field2' to disable dictionary encoding for specific columns.If your Flink job does not rely on state, please avoid using managed memory, which you can control with the following Flink parameter:
taskmanager.memory.managed.size=1m
Or you can use Flink managed memory for your write buffer to avoid OOM, set table property:
sink.use-managed-memory-allocator=true
Committer node may use a large memory if the amount of data written to the table is particularly large, OOM may occur if the memory is too small. In this case, you need to increase the Committer heap memory, but you may not want to increase the memory of Flink's TaskManager uniformly, which may lead to a waste of memory.
You can use fine-grained-resource-management of Flink to increase committer heap memory only:
cluster.fine-grained-resource-management.enabled: true. (This is default after Flink 1.18)sink.committer-memory, for example 300 MB, depends on your TaskManager. (sink.committer-cpu is also supported)fine-grained.shuffle-mode.all-blocking: true.