[FLINK-34458][checkpointing] Rename options for Generalized incremental checkpoints (changelog) (#24324)
diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md
index 17e5814..702d29b 100644
--- a/docs/content.zh/docs/deployment/config.md
+++ b/docs/content.zh/docs/deployment/config.md
@@ -466,11 +466,11 @@
### State Changelog Options
Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
-using State Changelog. {{< generated/state_backend_changelog_section >}}
+using State Changelog. {{< generated/state_changelog_section >}}
#### FileSystem-based Changelog options
-These settings take effect when the `state.backend.changelog.storage` is set to `filesystem` (see [above](#state-backend-changelog-storage)).
+These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
{{< generated/fs_state_changelog_configuration >}}
**RocksDB Configurable Options**
diff --git a/docs/content.zh/docs/ops/metrics.md b/docs/content.zh/docs/ops/metrics.md
index 652c8ab..716ce07 100644
--- a/docs/content.zh/docs/ops/metrics.md
+++ b/docs/content.zh/docs/ops/metrics.md
@@ -1734,7 +1734,7 @@
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
- <td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
+ <td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'state.changelog.dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/docs/content.zh/docs/ops/state/state_backends.md b/docs/content.zh/docs/ops/state/state_backends.md
index 16102e0..a8aebf9 100644
--- a/docs/content.zh/docs/ops/state/state_backends.md
+++ b/docs/content.zh/docs/ops/state/state_backends.md
@@ -383,7 +383,7 @@
值得注意的是虽然 Changelog 增加了少量的日常 CPU 和网络带宽资源使用,
但会降低峰值的 CPU 和网络带宽使用量。
-另一项需要考虑的事情是恢复时间。取决于 `state.backend.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
+另一项需要考虑的事情是恢复时间。取决于 `state.changelog.periodic-materialize.interval` 的设置,changelog 可能会变得冗长,因此重放会花费更多时间。即使这样,恢复时间加上 checkpoint 持续时间仍然可能低于不开启 changelog 功能的时间,从而在故障恢复的情况下也能提供更低的端到端延迟。当然,取决于上述时间的实际比例,有效恢复时间也有可能会增加。
有关更多详细信息,请参阅 [FLIP-158](https://cwiki.apache.org/confluence/display/FLINK/FLIP-158%3A+Generalized+incremental+checkpoints)。
@@ -401,9 +401,9 @@
这是 YAML 中的示例配置:
```yaml
-state.backend.changelog.enabled: true
-state.backend.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
-dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
+state.changelog.enabled: true
+state.changelog.storage: filesystem # 当前只支持 filesystem 和 memory(仅供测试用)
+state.changelog.dstl.dfs.base-path: s3://<bucket-name> # 类似于 state.checkpoints.dir
```
请将如下配置保持默认值 (参见[限制](#limitations)):
diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md
index fc426dc..993ded6 100644
--- a/docs/content/docs/deployment/config.md
+++ b/docs/content/docs/deployment/config.md
@@ -468,11 +468,11 @@
### State Changelog Options
Please refer to [State Backends]({{< ref "docs/ops/state/state_backends#enabling-changelog" >}}) for information on
-using State Changelog. {{< generated/state_backend_changelog_section >}}
+using State Changelog. {{< generated/state_changelog_section >}}
#### FileSystem-based Changelog options
-These settings take effect when the `state.backend.changelog.storage` is set to `filesystem` (see [above](#state-backend-changelog-storage)).
+These settings take effect when the `state.changelog.storage` is set to `filesystem` (see [above](#state-changelog-storage)).
{{< generated/fs_state_changelog_configuration >}}
**RocksDB Configurable Options**
diff --git a/docs/content/docs/ops/metrics.md b/docs/content/docs/ops/metrics.md
index cf331e3..91a654d 100644
--- a/docs/content/docs/ops/metrics.md
+++ b/docs/content/docs/ops/metrics.md
@@ -1724,7 +1724,7 @@
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
- <td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
+ <td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'state.changelog.dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
diff --git a/docs/content/docs/ops/state/state_backends.md b/docs/content/docs/ops/state/state_backends.md
index 085d10b..d2770eb 100644
--- a/docs/content/docs/ops/state/state_backends.md
+++ b/docs/content/docs/ops/state/state_backends.md
@@ -384,7 +384,7 @@
It is worth noting that changelog adds a small amount of daily CPU and network bandwidth resources,
but reduces peak CPU and network bandwidth usage.
-Recovery time is another thing to consider. Depending on the `state.backend.changelog.periodic-materialize.interval`
+Recovery time is another thing to consider. Depending on the `state.changelog.periodic-materialize.interval`
setting, the changelog can become lengthy and replaying it may take more time. However, recovery time combined with
checkpoint duration will likely still be lower than in non-changelog setups, providing lower end-to-end latency even in
failover case. However, it's also possible that the effective recovery time will increase, depending on the actual ratio
@@ -402,9 +402,9 @@
Here is an example configuration in YAML:
```yaml
-state.backend.changelog.enabled: true
-state.backend.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
-dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
+state.changelog.enabled: true
+state.changelog.storage: filesystem # currently, only filesystem and memory (for tests) are supported
+state.changelog.dstl.dfs.base-path: s3://<bucket-name> # similar to state.checkpoints.dir
```
Please keep the following defaults (see [limitations](#limitations)):
diff --git a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
index d5ca1bf..30d0d03e 100644
--- a/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/fs_state_changelog_configuration.html
@@ -9,88 +9,88 @@
</thead>
<tbody>
<tr>
- <td><h5>dstl.dfs.base-path</h5></td>
+ <td><h5>state.changelog.dstl.dfs.base-path</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Base path to store changelog files.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.batch.persist-delay</h5></td>
+ <td><h5>state.changelog.dstl.dfs.batch.persist-delay</h5></td>
<td style="word-wrap: break-word;">10 ms</td>
<td>Duration</td>
<td>Delay before persisting changelog after receiving persist request (on checkpoint). Minimizes the number of files and requests if multiple operators (backends) or sub-tasks are using the same store. Correspondingly increases checkpoint time (async phase).</td>
</tr>
<tr>
- <td><h5>dstl.dfs.batch.persist-size-threshold</h5></td>
+ <td><h5>state.changelog.dstl.dfs.batch.persist-size-threshold</h5></td>
<td style="word-wrap: break-word;">10 mb</td>
<td>MemorySize</td>
- <td>Size threshold for state changes that were requested to be persisted but are waiting for dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)</td>
+ <td>Size threshold for state changes that were requested to be persisted but are waiting for state.changelog.dstl.dfs.batch.persist-delay (from all operators). . Once reached, accumulated changes are persisted immediately. This is different from state.changelog.dstl.dfs.preemptive-persist-threshold as it happens AFTER the checkpoint and potentially for state changes of multiple operators. Must not exceed in-flight data limit (see below)</td>
</tr>
<tr>
- <td><h5>dstl.dfs.compression.enabled</h5></td>
+ <td><h5>state.changelog.dstl.dfs.compression.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable compression when serializing changelog.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.discard.num-threads</h5></td>
+ <td><h5>state.changelog.dstl.dfs.discard.num-threads</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>Integer</td>
<td>Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state).</td>
</tr>
<tr>
- <td><h5>dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
+ <td><h5>state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
<td>Maximum idle time for cache files of distributed changelog file, after which the cache files will be deleted.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.preemptive-persist-threshold</h5></td>
+ <td><h5>state.changelog.dstl.dfs.preemptive-persist-threshold</h5></td>
<td style="word-wrap: break-word;">5 mb</td>
<td>MemorySize</td>
<td>Size threshold for state changes of a single operator beyond which they are persisted pre-emptively without waiting for a checkpoint. Improves checkpointing time by allowing quasi-continuous uploading of state changes (as opposed to uploading all accumulated changes on checkpoint).</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.buffer-size</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.buffer-size</h5></td>
<td style="word-wrap: break-word;">1 mb</td>
<td>MemorySize</td>
<td>Buffer size used when uploading change sets</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.max-attempts</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.max-attempts</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
- <td>Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if dstl.dfs.upload.retry-policy is fixed.</td>
+ <td>Maximum number of attempts (including the initial one) to perform a particular upload. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.max-in-flight</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.max-in-flight</h5></td>
<td style="word-wrap: break-word;">100 mb</td>
<td>MemorySize</td>
- <td>Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to dstl.dfs.batch.persist-size-threshold</td>
+ <td>Max amount of data allowed to be in-flight. Upon reaching this limit the task will be back-pressured. I.e., snapshotting will block; normal processing will block if state.changelog.dstl.dfs.preemptive-persist-threshold is set and reached. The limit is applied to the total size of in-flight changes if multiple operators/backends are using the same changelog storage. Must be greater than or equal to state.changelog.dstl.dfs.batch.persist-size-threshold</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.next-attempt-delay</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.next-attempt-delay</h5></td>
<td style="word-wrap: break-word;">500 ms</td>
<td>Duration</td>
<td>Delay before the next attempt (if the failure was not caused by a timeout).</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.num-threads</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.num-threads</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Integer</td>
<td>Number of threads to use for upload.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.retry-policy</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.retry-policy</h5></td>
<td style="word-wrap: break-word;">"fixed"</td>
<td>String</td>
<td>Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.</td>
</tr>
<tr>
- <td><h5>dstl.dfs.upload.timeout</h5></td>
+ <td><h5>state.changelog.dstl.dfs.upload.timeout</h5></td>
<td style="word-wrap: break-word;">1 s</td>
<td>Duration</td>
- <td>Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout</td>
+ <td>Time threshold beyond which an upload is considered timed out. If a new attempt is made but this upload succeeds earlier then this upload result will be used. May improve upload times if tail latencies of upload requests are significantly high. Only takes effect if state.changelog.dstl.dfs.upload.retry-policy is fixed. Please note that timeout * max_attempts should be less than execution.checkpointing.timeout</td>
</tr>
</tbody>
</table>
diff --git a/docs/layouts/shortcodes/generated/state_changelog_configuration.html b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
index 22f83b4..182494f 100644
--- a/docs/layouts/shortcodes/generated/state_changelog_configuration.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_configuration.html
@@ -9,31 +9,31 @@
</thead>
<tbody>
<tr>
- <td><h5>state.backend.changelog.enabled</h5></td>
+ <td><h5>state.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.max-failures-allowed</h5></td>
+ <td><h5>state.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.periodic-materialize.enabled</h5></td>
+ <td><h5>state.changelog.periodic-materialize.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
+ <td><h5>state.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
- <td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true</td>
+ <td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.storage</h5></td>
+ <td><h5>state.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
diff --git a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html b/docs/layouts/shortcodes/generated/state_changelog_section.html
similarity index 81%
rename from docs/layouts/shortcodes/generated/state_backend_changelog_section.html
rename to docs/layouts/shortcodes/generated/state_changelog_section.html
index 22f83b4..182494f 100644
--- a/docs/layouts/shortcodes/generated/state_backend_changelog_section.html
+++ b/docs/layouts/shortcodes/generated/state_changelog_section.html
@@ -9,31 +9,31 @@
</thead>
<tbody>
<tr>
- <td><h5>state.backend.changelog.enabled</h5></td>
+ <td><h5>state.changelog.enabled</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to enable state backend to write state changes to StateChangelog. If this config is not set explicitly, it means no preference for enabling the change log, and the value in lower config level will take effect. The default value 'false' here means if no value set (job or cluster), the change log will not be enabled.</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.max-failures-allowed</h5></td>
+ <td><h5>state.changelog.max-failures-allowed</h5></td>
<td style="word-wrap: break-word;">3</td>
<td>Integer</td>
<td>Max number of consecutive materialization failures allowed.</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.periodic-materialize.enabled</h5></td>
+ <td><h5>state.changelog.periodic-materialize.enabled</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Defines whether to enable periodic materialization, all changelogs will not be truncated which may increase the space of checkpoint if disabled</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.periodic-materialize.interval</h5></td>
+ <td><h5>state.changelog.periodic-materialize.interval</h5></td>
<td style="word-wrap: break-word;">10 min</td>
<td>Duration</td>
- <td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.backend.changelog.periodic-materialize.enabled is true</td>
+ <td>Defines the interval in milliseconds to perform periodic materialization for state backend. It only takes effect when state.changelog.periodic-materialize.enabled is true</td>
</tr>
<tr>
- <td><h5>state.backend.changelog.storage</h5></td>
+ <td><h5>state.changelog.storage</h5></td>
<td style="word-wrap: break-word;">"memory"</td>
<td>String</td>
<td>The storage to be used to store state changelog.<br />The implementation can be specified via their shortcut name.<br />The list of recognized shortcut names currently includes 'memory' and 'filesystem'.</td>
diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
index c95f372..a66975e 100644
--- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
+++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java
@@ -79,7 +79,7 @@
public static final String STATE_LATENCY_TRACKING = "state_latency_tracking";
- public static final String STATE_BACKEND_CHANGELOG = "state_backend_changelog";
+ public static final String STATE_CHANGELOG = "state_changelog";
public static final String EXPERT_CLASS_LOADING = "expert_class_loading";
public static final String EXPERT_DEBUGGING_AND_TUNING = "expert_debugging_and_tuning";
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
index 619137e..050dbbf 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/StateChangelogOptions.java
@@ -27,20 +27,22 @@
@PublicEvolving
public class StateChangelogOptions {
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption<Boolean> PERIODIC_MATERIALIZATION_ENABLED =
- ConfigOptions.key("state.backend.changelog.periodic-materialize.enabled")
+ ConfigOptions.key("state.changelog.periodic-materialize.enabled")
.booleanType()
.defaultValue(true)
+ .withDeprecatedKeys("state.backend.changelog.periodic-materialize.enabled")
.withDescription(
"Defines whether to enable periodic materialization, "
+ "all changelogs will not be truncated which may increase the space of checkpoint if disabled");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption<Duration> PERIODIC_MATERIALIZATION_INTERVAL =
- ConfigOptions.key("state.backend.changelog.periodic-materialize.interval")
+ ConfigOptions.key("state.changelog.periodic-materialize.interval")
.durationType()
.defaultValue(Duration.ofMinutes(10))
+ .withDeprecatedKeys("state.backend.changelog.periodic-materialize.interval")
.withDescription(
"Defines the interval in milliseconds to perform "
+ "periodic materialization for state backend. "
@@ -48,19 +50,21 @@
+ PERIODIC_MATERIALIZATION_ENABLED.key()
+ " is true");
- @Documentation.Section(Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption<Integer> MATERIALIZATION_MAX_FAILURES_ALLOWED =
- ConfigOptions.key("state.backend.changelog.max-failures-allowed")
+ ConfigOptions.key("state.changelog.max-failures-allowed")
.intType()
.defaultValue(3)
+ .withDeprecatedKeys("state.backend.changelog.max-failures-allowed")
.withDescription("Max number of consecutive materialization failures allowed.");
/** Whether to enable state change log. */
- @Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(value = Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption<Boolean> ENABLE_STATE_CHANGE_LOG =
- ConfigOptions.key("state.backend.changelog.enabled")
+ ConfigOptions.key("state.changelog.enabled")
.booleanType()
.defaultValue(false)
+ .withDeprecatedKeys("state.backend.changelog.enabled")
.withDescription(
"Whether to enable state backend to write state changes to StateChangelog. "
+ "If this config is not set explicitly, it means no preference "
@@ -75,11 +79,12 @@
* <p>Recognized shortcut name is 'memory' from {@code
* InMemoryStateChangelogStorageFactory.getIdentifier()}, which is also the default value.
*/
- @Documentation.Section(value = Documentation.Sections.STATE_BACKEND_CHANGELOG)
+ @Documentation.Section(value = Documentation.Sections.STATE_CHANGELOG)
public static final ConfigOption<String> STATE_CHANGE_LOG_STORAGE =
- ConfigOptions.key("state.backend.changelog.storage")
+ ConfigOptions.key("state.changelog.storage")
.stringType()
.defaultValue("memory")
+ .withDeprecatedKeys("state.backend.changelog.storage")
.withDescription(
Description.builder()
.text("The storage to be used to store state changelog.")
diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
index 4d05962..1bd1ef0 100644
--- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
+++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/FsStateChangelogOptions.java
@@ -31,21 +31,24 @@
public class FsStateChangelogOptions {
public static final ConfigOption<String> BASE_PATH =
- ConfigOptions.key("dstl.dfs.base-path")
+ ConfigOptions.key("state.changelog.dstl.dfs.base-path")
.stringType()
.noDefaultValue()
+ .withDeprecatedKeys("dstl.dfs.base-path")
.withDescription("Base path to store changelog files.");
public static final ConfigOption<Boolean> COMPRESSION_ENABLED =
- ConfigOptions.key("dstl.dfs.compression.enabled")
+ ConfigOptions.key("state.changelog.dstl.dfs.compression.enabled")
.booleanType()
.defaultValue(false)
+ .withDeprecatedKeys("dstl.dfs.compression.enabled")
.withDescription("Whether to enable compression when serializing changelog.");
public static final ConfigOption<MemorySize> PREEMPTIVE_PERSIST_THRESHOLD =
- ConfigOptions.key("dstl.dfs.preemptive-persist-threshold")
+ ConfigOptions.key("state.changelog.dstl.dfs.preemptive-persist-threshold")
.memoryType()
.defaultValue(MemorySize.parse("5MB"))
+ .withDeprecatedKeys("dstl.dfs.preemptive-persist-threshold")
.withDescription(
"Size threshold for state changes of a single operator "
+ "beyond which they are persisted pre-emptively without waiting for a checkpoint. "
@@ -53,9 +56,10 @@
+ "(as opposed to uploading all accumulated changes on checkpoint).");
public static final ConfigOption<Duration> PERSIST_DELAY =
- ConfigOptions.key("dstl.dfs.batch.persist-delay")
+ ConfigOptions.key("state.changelog.dstl.dfs.batch.persist-delay")
.durationType()
.defaultValue(Duration.ofMillis(10))
+ .withDeprecatedKeys("dstl.dfs.batch.persist-delay")
.withDescription(
"Delay before persisting changelog after receiving persist request (on checkpoint). "
+ "Minimizes the number of files and requests "
@@ -63,9 +67,10 @@
+ "Correspondingly increases checkpoint time (async phase).");
public static final ConfigOption<MemorySize> PERSIST_SIZE_THRESHOLD =
- ConfigOptions.key("dstl.dfs.batch.persist-size-threshold")
+ ConfigOptions.key("state.changelog.dstl.dfs.batch.persist-size-threshold")
.memoryType()
.defaultValue(MemorySize.parse("10MB"))
+ .withDeprecatedKeys("dstl.dfs.batch.persist-size-threshold")
.withDescription(
"Size threshold for state changes that were requested to be persisted but are waiting for "
+ PERSIST_DELAY.key()
@@ -77,28 +82,32 @@
+ "Must not exceed in-flight data limit (see below)");
public static final ConfigOption<MemorySize> UPLOAD_BUFFER_SIZE =
- ConfigOptions.key("dstl.dfs.upload.buffer-size")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.buffer-size")
.memoryType()
.defaultValue(MemorySize.parse("1MB"))
+ .withDeprecatedKeys("dstl.dfs.upload.buffer-size")
.withDescription("Buffer size used when uploading change sets");
public static final ConfigOption<Integer> NUM_UPLOAD_THREADS =
- ConfigOptions.key("dstl.dfs.upload.num-threads")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.num-threads")
.intType()
.defaultValue(5)
+ .withDeprecatedKeys("dstl.dfs.upload.num-threads")
.withDescription("Number of threads to use for upload.");
public static final ConfigOption<Integer> NUM_DISCARD_THREADS =
- ConfigOptions.key("dstl.dfs.discard.num-threads")
+ ConfigOptions.key("state.changelog.dstl.dfs.discard.num-threads")
.intType()
.defaultValue(1)
+ .withDeprecatedKeys("dstl.dfs.discard.num-threads")
.withDescription(
"Number of threads to use to discard changelog (e.g. pre-emptively uploaded unused state).");
public static final ConfigOption<MemorySize> IN_FLIGHT_DATA_LIMIT =
- ConfigOptions.key("dstl.dfs.upload.max-in-flight")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.max-in-flight")
.memoryType()
.defaultValue(MemorySize.parse("100MB"))
+ .withDeprecatedKeys("dstl.dfs.upload.max-in-flight")
.withDescription(
"Max amount of data allowed to be in-flight. "
+ "Upon reaching this limit the task will be back-pressured. "
@@ -111,15 +120,17 @@
+ PERSIST_SIZE_THRESHOLD.key());
public static final ConfigOption<String> RETRY_POLICY =
- ConfigOptions.key("dstl.dfs.upload.retry-policy")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.retry-policy")
.stringType()
.defaultValue("fixed")
+ .withDeprecatedKeys("dstl.dfs.upload.retry-policy")
.withDescription(
"Retry policy for the failed uploads (in particular, timed out). Valid values: none, fixed.");
public static final ConfigOption<Duration> UPLOAD_TIMEOUT =
- ConfigOptions.key("dstl.dfs.upload.timeout")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.timeout")
.durationType()
.defaultValue(Duration.ofSeconds(1))
+ .withDeprecatedKeys("dstl.dfs.upload.timeout")
.withDescription(
"Time threshold beyond which an upload is considered timed out. "
+ "If a new attempt is made but this upload succeeds earlier then this upload result will be used. "
@@ -130,25 +141,28 @@
+ "Please note that timeout * max_attempts should be less than "
+ CHECKPOINTING_TIMEOUT.key());
public static final ConfigOption<Integer> RETRY_MAX_ATTEMPTS =
- ConfigOptions.key("dstl.dfs.upload.max-attempts")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.max-attempts")
.intType()
.defaultValue(3)
+ .withDeprecatedKeys("dstl.dfs.upload.max-attempts")
.withDescription(
"Maximum number of attempts (including the initial one) to perform a particular upload. "
+ "Only takes effect if "
+ RETRY_POLICY.key()
+ " is fixed.");
public static final ConfigOption<Duration> RETRY_DELAY_AFTER_FAILURE =
- ConfigOptions.key("dstl.dfs.upload.next-attempt-delay")
+ ConfigOptions.key("state.changelog.dstl.dfs.upload.next-attempt-delay")
.durationType()
.defaultValue(Duration.ofMillis(500))
+ .withDeprecatedKeys("dstl.dfs.upload.next-attempt-delay")
.withDescription(
"Delay before the next attempt (if the failure was not caused by a timeout).");
public static final ConfigOption<Duration> CACHE_IDLE_TIMEOUT =
- ConfigOptions.key("dstl.dfs.download.local-cache.idle-timeout-ms")
+ ConfigOptions.key("state.changelog.dstl.dfs.download.local-cache.idle-timeout-ms")
.durationType()
.defaultValue(Duration.ofMinutes(10))
+ .withDeprecatedKeys("dstl.dfs.download.local-cache.idle-timeout-ms")
.withDescription(
"Maximum idle time for cache files of distributed changelog file, "
+ "after which the cache files will be deleted.");