[FLINK-35771][s3] Limit the amount of work per s5cmd call
Motivation:
- limit CPU usage by S5cmd to prevent TM overload
- pick up potentially updated credentials for every s5cmd call
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
index a20f682..46e3011 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
@@ -50,8 +50,11 @@
/** The path where to duplicate the source file. */
Path getDestination();
+ /** The size in bytes of the requested file to copy. */
+ long getSize();
+
/** A factory method for creating a simple pair of source/destination. */
- static CopyRequest of(Path source, Path destination) {
+ static CopyRequest of(Path source, Path destination, long size) {
return new CopyRequest() {
@Override
public Path getSource() {
@@ -64,6 +67,11 @@
}
@Override
+ public long getSize() {
+ return size;
+ }
+
+ @Override
public String toString() {
return "CopyRequest{"
+ "source="
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 16f3916..4a6846a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -23,6 +23,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem.S5CmdConfiguration;
@@ -77,6 +78,18 @@
.withDescription(
"Extra arguments to be passed to s5cmd. For example, --no-sign-request for public buckets and -r 10 for 10 retries");
+ public static final ConfigOption<MemorySize> S5CMD_BATCH_MAX_SIZE =
+ ConfigOptions.key("s3.s5cmd.batch.max-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(1024))
+ .withDescription("Maximum size of files to download per one call to s5cmd.");
+
+ public static final ConfigOption<Integer> S5CMD_BATCH_MAX_FILES =
+ ConfigOptions.key("s3.s5cmd.batch.max-files")
+ .intType()
+ .defaultValue(100)
+ .withDescription("Maximum number of files to download per one call to s5cmd");
+
public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
ConfigOptions.key("s3.upload.min.part.size")
.longType()
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index f888568..49f4b2c 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -62,6 +62,8 @@
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;
@@ -112,6 +114,8 @@
@Nullable private final String accessArtifact;
@Nullable private final String secretArtifact;
@Nullable private final String endpoint;
+ private long maxBatchSizeFiles;
+ private long maxBatchSizeBytes;
/** All parameters can be empty. */
public S5CmdConfiguration(
@@ -119,7 +123,9 @@
String args,
@Nullable String accessArtifact,
@Nullable String secretArtifact,
- @Nullable String endpoint) {
+ @Nullable String endpoint,
+ int maxBatchSizeFiles,
+ long maxBatchSizeBytes) {
if (!path.isEmpty()) {
File s5CmdFile = new File(path);
checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary under [%s]", path);
@@ -131,6 +137,8 @@
this.accessArtifact = accessArtifact;
this.secretArtifact = secretArtifact;
this.endpoint = endpoint;
+ this.maxBatchSizeFiles = maxBatchSizeFiles;
+ this.maxBatchSizeBytes = maxBatchSizeBytes;
}
public static Optional<S5CmdConfiguration> of(Configuration flinkConfig) {
@@ -143,7 +151,9 @@
flinkConfig.getString(S5CMD_EXTRA_ARGS),
flinkConfig.get(ACCESS_KEY),
flinkConfig.get(SECRET_KEY),
- flinkConfig.get(ENDPOINT)));
+ flinkConfig.get(ENDPOINT),
+ flinkConfig.get(S5CMD_BATCH_MAX_FILES),
+ flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes()));
}
private void configureEnvironment(Map<String, String> environment) {
@@ -263,7 +273,32 @@
artefacts.add(s5CmdConfiguration.path);
artefacts.addAll(s5CmdConfiguration.args);
artefacts.add("run");
- castSpell(convertToSpells(requests), closeableRegistry, artefacts.toArray(new String[0]));
+
+ ArrayList<CopyRequest> batch = new ArrayList<>();
+ long runningSizeBytes = 0L;
+ long runningSizeFiles = 0L;
+ for (int i = 0; i < requests.size(); i++) {
+ CopyRequest request = requests.get(i);
+ batch.add(request);
+ runningSizeBytes += request.getSize();
+ runningSizeFiles++;
+ if (runningSizeBytes >= s5CmdConfiguration.maxBatchSizeBytes
+ || runningSizeFiles >= s5CmdConfiguration.maxBatchSizeFiles
+ || i == requests.size() - 1) {
+ LOG.info(
+ "Copy {} files using s5cmd, total size: {}, args: {}",
+ requests.size(),
+ runningSizeBytes,
+ artefacts);
+ castSpell(
+ convertToSpells(batch),
+ closeableRegistry,
+ artefacts.toArray(new String[0]));
+ runningSizeFiles = 0;
+ runningSizeBytes = 0;
+ batch.clear();
+ }
+ }
}
private List<String> convertToSpells(List<CopyRequest> requests) throws IOException {
@@ -282,7 +317,6 @@
private void castSpell(
List<String> spells, ICloseableRegistry closeableRegistry, String... artefacts)
throws IOException {
- LOG.info("Casting spell: {}", Arrays.toString(artefacts));
int exitCode = 0;
final AtomicReference<IOException> maybeCloseableRegistryException =
new AtomicReference<>();
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
index f7b52c1..05c43f0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
@@ -22,15 +22,18 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
import org.apache.flink.util.Preconditions;
import org.apache.commons.io.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nonnull;
@@ -39,24 +42,28 @@
import java.io.IOException;
import java.net.URI;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;
/** Unit tests for FlinkS3FileSystem. */
class FlinkS3FileSystemTest {
- @TempDir public static File temporaryDirectory;
@Test
@DisabledOnOs({OS.WINDOWS, OS.OTHER}) // OS must support SLEEP command
- public void testCopyCommandInterruptible() throws Exception {
+ public void testCopyCommandInterruptible(@TempDir File temporaryDirectory) throws Exception {
File cmdFile = new File(temporaryDirectory, "cmd");
@@ -95,9 +102,10 @@
try {
fs.copyFiles(
Collections.singletonList(
- new PathsCopyingFileSystem.CopyTask(
+ CopyRequest.of(
Path.fromLocalFile(new File("")),
- Path.fromLocalFile(new File("")))),
+ Path.fromLocalFile(new File("")),
+ 100L)),
closeableRegistry);
} catch (IOException ex) {
actualException.set(ex);
@@ -112,4 +120,69 @@
Assertions.assertThat(actualException.get())
.hasStackTraceContaining("Copy process destroyed by CloseableRegistry.");
}
+
+ @ParameterizedTest
+ @ValueSource(ints = {1, 7, 10, 14, Integer.MAX_VALUE})
+ @EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script
+ public void testCopyCommandBatching(int batchSize, @TempDir File temporaryDirectory)
+ throws Exception {
+ final int numFiles = 10;
+
+ File cmdFile = new File(temporaryDirectory, "cmd");
+ File inputToCmd = new File(temporaryDirectory, "input");
+ Preconditions.checkState(inputToCmd.mkdir());
+
+ String cmd =
+ String.format(
+ "file=$(mktemp %s/s5cmd-input-XXX)\n"
+ + "while read line; do echo $line >> $file; done < /dev/stdin",
+ inputToCmd.getAbsolutePath());
+
+ FileUtils.writeStringToFile(cmdFile, cmd);
+ Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable.");
+
+ final Configuration conf = new Configuration();
+ conf.set(S5CMD_PATH, cmdFile.getAbsolutePath());
+ conf.set(S5CMD_EXTRA_ARGS, "");
+ conf.set(S5CMD_BATCH_MAX_FILES, batchSize);
+ conf.set(ACCESS_KEY, "test-access-key");
+ conf.set(SECRET_KEY, "test-secret-key");
+ conf.set(ENDPOINT, "test-endpoint");
+
+ TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
+ factory.configure(conf);
+
+ FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+ List<CopyRequest> tasks =
+ IntStream.range(0, numFiles)
+ .mapToObj(
+ i ->
+ CopyRequest.of(
+ new Path("file:///src-" + i),
+ new Path("file:///dst-" + i),
+ 123L))
+ .collect(Collectors.toList());
+ fs.copyFiles(tasks, ICloseableRegistry.NO_OP);
+
+ File[] files = inputToCmd.listFiles();
+ Assertions.assertThat(files).isNotNull();
+ Assertions.assertThat(files.length)
+ .describedAs("Wrong number of s5cmd subcommand batches for input tasks: %s", tasks)
+ .isEqualTo(numFiles / batchSize + (numFiles % batchSize > 0 ? 1 : 0));
+ int totalSubcommands = 0;
+ for (File file : files) {
+ List<String> subcommands = FileUtils.readLines(file, StandardCharsets.UTF_8);
+ Assertions.assertThat(subcommands.size())
+ .describedAs(
+ "Too many s5cmd subcommands issued per batch: %s\n(input files: %s)",
+ subcommands, tasks)
+ .isLessThanOrEqualTo(batchSize);
+ totalSubcommands += subcommands.size();
+ }
+ Assertions.assertThat(totalSubcommands)
+ .describedAs(
+ "The total number of issued s5cmd subcommands did not match the number of copy tasks:\n%s,\n%s",
+ totalSubcommands, tasks)
+ .isEqualTo(numFiles);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
index eee0d7b..d8f45e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -66,7 +66,9 @@
throw new IllegalArgumentException("We can duplicate only FileStateHandles.");
}
final Path srcPath = ((FileStateHandle) handle).getFilePath();
- requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
+ requests.add(
+ CopyRequest.of(
+ srcPath, getNewDstPath(srcPath.getName()), handle.getStateSize()));
}
fs.copyFiles(requests, new CloseableRegistry());
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index 0a5656c..4241946 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -154,6 +154,7 @@
if (canCopyPaths(handleAndLocalPath)) {
org.apache.flink.core.fs.Path remotePath =
handleAndLocalPath.getHandle().maybeGetPath().get();
+ long size = handleAndLocalPath.getHandle().getStateSize();
FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri());
filesSystemsFilesToDownload
.computeIfAbsent(newFSKey, fsKey -> new ArrayList<>())
@@ -161,7 +162,8 @@
CopyRequest.of(
remotePath,
new org.apache.flink.core.fs.Path(
- downloadDestination.toUri())));
+ downloadDestination.toUri()),
+ size));
} else {
runnables.add(
createDownloadRunnableUsingStreams(