[FLINK-35771][s3] Limit the amount of work per s5cmd call

Motivation:
- limit CPU usage by S5cmd to prevent TM overload
- pick up potentially updated credentials for every s5cmd call
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
index a20f682..46e3011 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/PathsCopyingFileSystem.java
@@ -50,8 +50,11 @@
         /** The path where to duplicate the source file. */
         Path getDestination();
 
+        /** The size in bytes of the requested file to copy. */
+        long getSize();
+
         /** A factory method for creating a simple pair of source/destination. */
-        static CopyRequest of(Path source, Path destination) {
+        static CopyRequest of(Path source, Path destination, long size) {
             return new CopyRequest() {
                 @Override
                 public Path getSource() {
@@ -64,6 +67,11 @@
                 }
 
                 @Override
+                public long getSize() {
+                    return size;
+                }
+
+                @Override
                 public String toString() {
                     return "CopyRequest{"
                             + "source="
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
index 16f3916..4a6846a 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java
@@ -23,6 +23,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.fs.s3.common.FlinkS3FileSystem.S5CmdConfiguration;
@@ -77,6 +78,18 @@
                     .withDescription(
                             "Extra arguments to be passed to s5cmd. For example, --no-sign-request for public buckets and -r 10 for 10 retries");
 
+    public static final ConfigOption<MemorySize> S5CMD_BATCH_MAX_SIZE =
+            ConfigOptions.key("s3.s5cmd.batch.max-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(1024))
+                    .withDescription("Maximum size of files to download per one call to s5cmd.");
+
+    public static final ConfigOption<Integer> S5CMD_BATCH_MAX_FILES =
+            ConfigOptions.key("s3.s5cmd.batch.max-files")
+                    .intType()
+                    .defaultValue(100)
+                    .withDescription("Maximum number of files to download per one call to s5cmd");
+
     public static final ConfigOption<Long> PART_UPLOAD_MIN_SIZE =
             ConfigOptions.key("s3.upload.min.part.size")
                     .longType()
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
index f888568..49f4b2c 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java
@@ -62,6 +62,8 @@
 
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_SIZE;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;
@@ -112,6 +114,8 @@
         @Nullable private final String accessArtifact;
         @Nullable private final String secretArtifact;
         @Nullable private final String endpoint;
+        private long maxBatchSizeFiles;
+        private long maxBatchSizeBytes;
 
         /** All parameters can be empty. */
         public S5CmdConfiguration(
@@ -119,7 +123,9 @@
                 String args,
                 @Nullable String accessArtifact,
                 @Nullable String secretArtifact,
-                @Nullable String endpoint) {
+                @Nullable String endpoint,
+                int maxBatchSizeFiles,
+                long maxBatchSizeBytes) {
             if (!path.isEmpty()) {
                 File s5CmdFile = new File(path);
                 checkArgument(s5CmdFile.isFile(), "Unable to find s5cmd binary under [%s]", path);
@@ -131,6 +137,8 @@
             this.accessArtifact = accessArtifact;
             this.secretArtifact = secretArtifact;
             this.endpoint = endpoint;
+            this.maxBatchSizeFiles = maxBatchSizeFiles;
+            this.maxBatchSizeBytes = maxBatchSizeBytes;
         }
 
         public static Optional<S5CmdConfiguration> of(Configuration flinkConfig) {
@@ -143,7 +151,9 @@
                                             flinkConfig.getString(S5CMD_EXTRA_ARGS),
                                             flinkConfig.get(ACCESS_KEY),
                                             flinkConfig.get(SECRET_KEY),
-                                            flinkConfig.get(ENDPOINT)));
+                                            flinkConfig.get(ENDPOINT),
+                                            flinkConfig.get(S5CMD_BATCH_MAX_FILES),
+                                            flinkConfig.get(S5CMD_BATCH_MAX_SIZE).getBytes()));
         }
 
         private void configureEnvironment(Map<String, String> environment) {
@@ -263,7 +273,32 @@
         artefacts.add(s5CmdConfiguration.path);
         artefacts.addAll(s5CmdConfiguration.args);
         artefacts.add("run");
-        castSpell(convertToSpells(requests), closeableRegistry, artefacts.toArray(new String[0]));
+
+        ArrayList<CopyRequest> batch = new ArrayList<>();
+        long runningSizeBytes = 0L;
+        long runningSizeFiles = 0L;
+        for (int i = 0; i < requests.size(); i++) {
+            CopyRequest request = requests.get(i);
+            batch.add(request);
+            runningSizeBytes += request.getSize();
+            runningSizeFiles++;
+            if (runningSizeBytes >= s5CmdConfiguration.maxBatchSizeBytes
+                    || runningSizeFiles >= s5CmdConfiguration.maxBatchSizeFiles
+                    || i == requests.size() - 1) {
+                LOG.info(
+                        "Copy {} files using s5cmd, total size: {}, args: {}",
+                        requests.size(),
+                        runningSizeBytes,
+                        artefacts);
+                castSpell(
+                        convertToSpells(batch),
+                        closeableRegistry,
+                        artefacts.toArray(new String[0]));
+                runningSizeFiles = 0;
+                runningSizeBytes = 0;
+                batch.clear();
+            }
+        }
     }
 
     private List<String> convertToSpells(List<CopyRequest> requests) throws IOException {
@@ -282,7 +317,6 @@
     private void castSpell(
             List<String> spells, ICloseableRegistry closeableRegistry, String... artefacts)
             throws IOException {
-        LOG.info("Casting spell: {}", Arrays.toString(artefacts));
         int exitCode = 0;
         final AtomicReference<IOException> maybeCloseableRegistryException =
                 new AtomicReference<>();
diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
index f7b52c1..05c43f0 100644
--- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/FlinkS3FileSystemTest.java
@@ -22,15 +22,18 @@
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.fs.PathsCopyingFileSystem;
+import org.apache.flink.core.fs.PathsCopyingFileSystem.CopyRequest;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.commons.io.FileUtils;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.EnabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import javax.annotation.Nonnull;
 
@@ -39,24 +42,28 @@
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ACCESS_KEY;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.ENDPOINT;
+import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_BATCH_MAX_FILES;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_EXTRA_ARGS;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.S5CMD_PATH;
 import static org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.SECRET_KEY;
 
 /** Unit tests for FlinkS3FileSystem. */
 class FlinkS3FileSystemTest {
-    @TempDir public static File temporaryDirectory;
 
     @Test
     @DisabledOnOs({OS.WINDOWS, OS.OTHER}) // OS must support SLEEP command
-    public void testCopyCommandInterruptible() throws Exception {
+    public void testCopyCommandInterruptible(@TempDir File temporaryDirectory) throws Exception {
 
         File cmdFile = new File(temporaryDirectory, "cmd");
 
@@ -95,9 +102,10 @@
                             try {
                                 fs.copyFiles(
                                         Collections.singletonList(
-                                                new PathsCopyingFileSystem.CopyTask(
+                                                CopyRequest.of(
                                                         Path.fromLocalFile(new File("")),
-                                                        Path.fromLocalFile(new File("")))),
+                                                        Path.fromLocalFile(new File("")),
+                                                        100L)),
                                         closeableRegistry);
                             } catch (IOException ex) {
                                 actualException.set(ex);
@@ -112,4 +120,69 @@
         Assertions.assertThat(actualException.get())
                 .hasStackTraceContaining("Copy process destroyed by CloseableRegistry.");
     }
+
+    @ParameterizedTest
+    @ValueSource(ints = {1, 7, 10, 14, Integer.MAX_VALUE})
+    @EnabledOnOs({OS.LINUX, OS.MAC}) // POSIX OS only to run shell script
+    public void testCopyCommandBatching(int batchSize, @TempDir File temporaryDirectory)
+            throws Exception {
+        final int numFiles = 10;
+
+        File cmdFile = new File(temporaryDirectory, "cmd");
+        File inputToCmd = new File(temporaryDirectory, "input");
+        Preconditions.checkState(inputToCmd.mkdir());
+
+        String cmd =
+                String.format(
+                        "file=$(mktemp %s/s5cmd-input-XXX)\n"
+                                + "while read line; do echo $line >> $file; done < /dev/stdin",
+                        inputToCmd.getAbsolutePath());
+
+        FileUtils.writeStringToFile(cmdFile, cmd);
+        Preconditions.checkState(cmdFile.setExecutable(true), "Cannot set script file executable.");
+
+        final Configuration conf = new Configuration();
+        conf.set(S5CMD_PATH, cmdFile.getAbsolutePath());
+        conf.set(S5CMD_EXTRA_ARGS, "");
+        conf.set(S5CMD_BATCH_MAX_FILES, batchSize);
+        conf.set(ACCESS_KEY, "test-access-key");
+        conf.set(SECRET_KEY, "test-secret-key");
+        conf.set(ENDPOINT, "test-endpoint");
+
+        TestS3FileSystemFactory factory = new TestS3FileSystemFactory();
+        factory.configure(conf);
+
+        FlinkS3FileSystem fs = (FlinkS3FileSystem) factory.create(new URI("s3://test"));
+        List<CopyRequest> tasks =
+                IntStream.range(0, numFiles)
+                        .mapToObj(
+                                i ->
+                                        CopyRequest.of(
+                                                new Path("file:///src-" + i),
+                                                new Path("file:///dst-" + i),
+                                                123L))
+                        .collect(Collectors.toList());
+        fs.copyFiles(tasks, ICloseableRegistry.NO_OP);
+
+        File[] files = inputToCmd.listFiles();
+        Assertions.assertThat(files).isNotNull();
+        Assertions.assertThat(files.length)
+                .describedAs("Wrong number of s5cmd subcommand batches for input tasks: %s", tasks)
+                .isEqualTo(numFiles / batchSize + (numFiles % batchSize > 0 ? 1 : 0));
+        int totalSubcommands = 0;
+        for (File file : files) {
+            List<String> subcommands = FileUtils.readLines(file, StandardCharsets.UTF_8);
+            Assertions.assertThat(subcommands.size())
+                    .describedAs(
+                            "Too many s5cmd subcommands issued per batch: %s\n(input files: %s)",
+                            subcommands, tasks)
+                    .isLessThanOrEqualTo(batchSize);
+            totalSubcommands += subcommands.size();
+        }
+        Assertions.assertThat(totalSubcommands)
+                .describedAs(
+                        "The total number of issued s5cmd subcommands did not match the number of copy tasks:\n%s,\n%s",
+                        totalSubcommands, tasks)
+                .isEqualTo(numFiles);
+    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
index eee0d7b..d8f45e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateToolset.java
@@ -66,7 +66,9 @@
                 throw new IllegalArgumentException("We can duplicate only FileStateHandles.");
             }
             final Path srcPath = ((FileStateHandle) handle).getFilePath();
-            requests.add(CopyRequest.of(srcPath, getNewDstPath(srcPath.getName())));
+            requests.add(
+                    CopyRequest.of(
+                            srcPath, getNewDstPath(srcPath.getName()), handle.getStateSize()));
         }
         fs.copyFiles(requests, new CloseableRegistry());
 
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index 0a5656c..4241946 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -154,6 +154,7 @@
                 if (canCopyPaths(handleAndLocalPath)) {
                     org.apache.flink.core.fs.Path remotePath =
                             handleAndLocalPath.getHandle().maybeGetPath().get();
+                    long size = handleAndLocalPath.getHandle().getStateSize();
                     FileSystem.FSKey newFSKey = new FileSystem.FSKey(remotePath.toUri());
                     filesSystemsFilesToDownload
                             .computeIfAbsent(newFSKey, fsKey -> new ArrayList<>())
@@ -161,7 +162,8 @@
                                     CopyRequest.of(
                                             remotePath,
                                             new org.apache.flink.core.fs.Path(
-                                                    downloadDestination.toUri())));
+                                                    downloadDestination.toUri()),
+                                            size));
                 } else {
                     runnables.add(
                             createDownloadRunnableUsingStreams(