title: “Consumer ID” weight: 5 type: docs aliases:
Consumer id can help you accomplish the following two things:
You can specify the consumer-id when streaming read table.
The consumer will prevent expiration of the snapshot. In order to prevent too many snapshots caused by mistakes, you need to specify 'consumer.expiration-time' to manage the lifetime of consumers.
ALTER TABLE t SET ('consumer.expiration-time' = '1 d');
Then, restart streaming write job of this table, expiration of consumers will be triggered in writing job.
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.mode' = 'at-least-once') */;
Sometimes, you only want the feature of ‘Safe Consumption’. You want to get a new snapshot progress when restarting the stream consumption job , you can enable the 'consumer.ignore-progress' option.
SELECT * FROM t /*+ OPTIONS('consumer-id' = 'myid', 'consumer.ignore-progress' = 'true') */;
The startup of this job will retrieve the snapshot that should be read again.
By default, the consumption of snapshots is strictly aligned within the checkpoint to make ‘Resume from breakpoint’ feature exactly-once.
But in some scenarios where you don‘t need ‘Resume from breakpoint’, or you don’t need strict ‘Resume from breakpoint’, you can consider enabling 'consumer.mode' = 'at-least-once' mode. This mode:
{{< hint >}} About 'consumer.mode', since the implementation of exactly-once mode and at-least-once mode are completely different, the state of flink is incompatible and cannot be restored from the state when switching modes. {{< /hint >}}
You can reset or delete a consumer with a given consumer ID and next snapshot ID and delete a consumer with a given consumer ID. First, you need to stop the streaming task using this consumer ID, and then execute the reset consumer action job.
Run the following command:
{{< tabs “reset_consumer” >}}
{{< tab “Flink SQL” >}}
CALL sys.reset_consumer( `table` => 'database_name.table_name', consumer_id => 'consumer_id', next_snapshot_id => <snapshot_id> ); -- No next_snapshot_id if you want to delete the consumer
{{< /tab >}}
{{< tab “Flink Action” >}}
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ reset-consumer \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ --consumer_id <consumer-id> \ [--next_snapshot <next-snapshot-id>] \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] ## No next_snapshot if you want to delete the consumer
{{< /tab >}}
{{< /tabs >}}
You can clear consumers in bulk with a given including consumers and excluding consumers(accept regular expression).
Run the following command:
{{< tabs “clear_consumers” >}}
{{< tab “Flink SQL” >}}
CALL sys.clear_consumers( `table` => 'database_name.table_name', `including_consumers` => 'including_consumers', `excluding_consumers` => 'excluding_consumers' ); -- No including_consumers if you want to clear all consumers except excluding_consumers in the table
{{< /tab >}}
{{< tab “Flink Action” >}}
<FLINK_HOME>/bin/flink run \ /path/to/paimon-flink-action-{{< version >}}.jar \ clear_consumers \ --warehouse <warehouse-path> \ --database <database-name> \ --table <table-name> \ [--including_consumers <including-consumers>] \ [--excluding_consumers <excluding-consumers>] \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] ## No including_consumers if you want to clear all consumers except excluding_consumers in the table
{{< /tab >}}
{{< /tabs >}}