[FLINK-23429][state-processor-api] Use Path instead of Path.getPath() to preserve FileSystem info
This closes #16550
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
index 9b15222..f761061 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/FileCopyFunction.java
@@ -32,7 +32,7 @@
/** This output format copies files from an existing savepoint into a new directory. */
@Internal
-public final class FileCopyFunction implements OutputFormat<String> {
+public final class FileCopyFunction implements OutputFormat<Path> {
private static final long serialVersionUID = 1L;
@@ -52,8 +52,7 @@
}
@Override
- public void writeRecord(String record) throws IOException {
- Path sourcePath = new Path(record);
+ public void writeRecord(Path sourcePath) throws IOException {
Path destPath = new Path(path, sourcePath.getName());
try (FSDataOutputStream os =
destPath.getFileSystem()
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
index b7c260b..b9107a5 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/StatePathExtractor.java
@@ -34,18 +34,18 @@
/** Extracts all file paths that are part of the provided {@link OperatorState}. */
@Internal
-public class StatePathExtractor implements FlatMapFunction<OperatorState, String> {
+public class StatePathExtractor implements FlatMapFunction<OperatorState, Path> {
private static final long serialVersionUID = 1L;
@Override
- public void flatMap(OperatorState operatorState, Collector<String> out) throws Exception {
+ public void flatMap(OperatorState operatorState, Collector<Path> out) throws Exception {
for (OperatorSubtaskState subTaskState : operatorState.getSubtaskStates().values()) {
// managed operator state
for (OperatorStateHandle operatorStateHandle : subTaskState.getManagedOperatorState()) {
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
if (path != null) {
- out.collect(path.getPath());
+ out.collect(path);
}
}
// managed keyed state
@@ -55,7 +55,7 @@
getStateFilePathFromStreamStateHandle(
(KeyGroupsStateHandle) keyedStateHandle);
if (path != null) {
- out.collect(path.getPath());
+ out.collect(path);
}
}
}
@@ -63,7 +63,7 @@
for (OperatorStateHandle operatorStateHandle : subTaskState.getRawOperatorState()) {
Path path = getStateFilePathFromStreamStateHandle(operatorStateHandle);
if (path != null) {
- out.collect(path.getPath());
+ out.collect(path);
}
}
// raw keyed state
@@ -73,7 +73,7 @@
getStateFilePathFromStreamStateHandle(
(KeyGroupsStateHandle) keyedStateHandle);
if (path != null) {
- out.collect(path.getPath());
+ out.collect(path);
}
}
}