[feature][CheckPoint-stroage]:Added Disable cache configuration. (#6718)
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md
index 1e9a933..13e1721 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -208,3 +208,40 @@
```
+### Enable cache
+
+When storage:type is hdfs, cache is disabled by default. If you want to enable it, set `disable.cache: false`
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: hdfs:///
+
+```
+
+or
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: file:///
+```
+
diff --git a/docs/zh/seatunnel-engine/checkpoint-storage.md b/docs/zh/seatunnel-engine/checkpoint-storage.md
index 2ce3be5..a544926 100644
--- a/docs/zh/seatunnel-engine/checkpoint-storage.md
+++ b/docs/zh/seatunnel-engine/checkpoint-storage.md
@@ -184,3 +184,39 @@
```
+### 开启高速缓存
+
+当storage:type为hdfs时,默认关闭cache。如果您想启用它,请设置为`disable.cache: false`。
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: hdfs:/// # Ensure that the directory has written permission
+```
+
+or
+
+```yaml
+seatunnel:
+ engine:
+ checkpoint:
+ interval: 6000
+ timeout: 7000
+ storage:
+ type: hdfs
+ max-retained: 3
+ plugin-config:
+ storage.type: hdfs
+ disable.cache: false
+ fs.defaultFS: file:///
+```
+
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
index a6a2037..4a76c7e 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/AbstractConfiguration.java
@@ -30,6 +30,11 @@
protected static final String HDFS_IMPL_KEY = "impl";
+ protected static final String COMMON_DISABLE_CACHE = "%s.disable.cache";
+
+ protected static final String DISABLE_CACHE_DEFAULT_VALUE = "TRUE";
+
+ protected static final String DISABLE_CACHE_KEY = "disable.cache";
/**
* check the configuration keys
*
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 2da4c6a..0ffb95b 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -77,6 +77,10 @@
if (config.containsKey(HDFS_SITE_PATH)) {
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
}
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, HDFS_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
index b03e91b..af11a59 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/LocalConfiguration.java
@@ -39,6 +39,11 @@
hadoopConf.set(
FS_DEFAULT_NAME_KEY,
config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT));
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, HDFS_LOCAL_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
+
return hadoopConf;
}
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
index 08aef21..b87ab27 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/OssConfiguration.java
@@ -43,6 +43,10 @@
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, OSS_IMPL_KEY),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, OSS_KEY);
return hadoopConf;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
index ea3ee0f..4041af7 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/S3Configuration.java
@@ -69,6 +69,10 @@
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
+ hadoopConf.setBoolean(
+ String.format(COMMON_DISABLE_CACHE, formatKey(protocol, HDFS_IMPL_KEY)),
+ Boolean.parseBoolean(
+ config.getOrDefault(DISABLE_CACHE_KEY, DISABLE_CACHE_DEFAULT_VALUE)));
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
return hadoopConf;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
index 23a41a2..a085acb 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HDFSFileCheckpointTest.java
@@ -36,7 +36,7 @@
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "hdfs");
- config.put("fs.defaultFS", "hdfs://usdp-bing");
+ config.put("disable.cache", "false");
config.put("seatunnel.hadoop.dfs.nameservices", "usdp-bing");
config.put("seatunnel.hadoop.dfs.ha.namenodes.usdp-bing", "nn1,nn2");
config.put("seatunnel.hadoop.dfs.namenode.rpc-address.usdp-bing.nn1", "usdp-bing-nn1:8020");
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
index 94d058c..f441364 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/LocalFileCheckPointTest.java
@@ -35,6 +35,7 @@
public static void setup() throws CheckpointStorageException {
HashMap config = new HashMap();
config.put("namespace", "/tmp/");
+ config.put("disable.cache", "false");
STORAGE = new HdfsStorage(config);
initStorageData();
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
index e9de3c0..3d7299c 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/OssFileCheckpointTest.java
@@ -35,6 +35,7 @@
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "oss");
+ config.put("disable.cache", "false");
config.put("fs.oss.accessKeyId", "your access key id");
config.put("fs.oss.accessKeySecret", "your access key secret");
config.put("fs.oss.endpoint", "oss-cn-hangzhou.aliyuncs.com");
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
index c9657a5..fb7b2a1 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/test/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/S3FileCheckpointTest.java
@@ -36,6 +36,7 @@
public static void setup() throws CheckpointStorageException {
Map<String, String> config = new HashMap<>();
config.put("storage.type", "s3");
+ config.put("disable.cache", "false");
config.put("fs.s3a.access.key", "your access key");
config.put("fs.s3a.secret.key", "your secret key");
config.put("s3.bucket", "s3a://calvin.test.cn");