[FLINK-39607][s3] Add missing Nullable annotation and precondition for config
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index cb5026d..0ec9fb2 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -31,6 +31,7 @@
 import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
 import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -127,7 +128,8 @@
             boolean useAsyncOperations,
             int readBufferSize,
             Duration fsCloseTimeout) {
-        this.clientProvider = clientProvider;
+        this.clientProvider =
+                Preconditions.checkNotNull(clientProvider, "clientProvider must not be null");
         this.uri = uri;
         this.bucketName = uri.getHost();
         this.entropyInjectionKey = entropyInjectionKey;
@@ -550,9 +552,8 @@
                                                 "Native S3 FileSystem closed for bucket: {}",
                                                 bucketName))
                         .thenCompose(
-                                ignored -> {
-                                    if (clientProvider != null) {
-                                        return clientProvider
+                                ignored ->
+                                        clientProvider
                                                 .closeAsync()
                                                 .whenComplete(
                                                         (result, error) -> {
@@ -564,10 +565,7 @@
                                                                 LOG.debug(
                                                                         "S3 client provider closed");
                                                             }
-                                                        });
-                                    }
-                                    return CompletableFuture.completedFuture(null);
-                                })
+                                                        }))
                         .orTimeout(fsCloseTimeout.toSeconds(), TimeUnit.SECONDS)
                         .whenComplete(
                                 (result, error) -> {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 2e1062f..c54ed86 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -31,6 +31,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URI;
 import java.time.Duration;
@@ -292,7 +294,7 @@
                                     + "When not set, the default chain is used: delegation tokens -> "
                                     + "static credentials (if configured) -> DefaultCredentialsProvider.");
 
-    private Configuration flinkConfig;
+    @Nullable private Configuration flinkConfig;
 
     @Override
     public String getScheme() {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
index ac3562f..d9a123d 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
@@ -19,13 +19,12 @@
 package org.apache.flink.fs.s3native;
 
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.util.Preconditions;
 
 import software.amazon.awssdk.core.sync.RequestBody;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 
-import javax.annotation.Nullable;
-
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -63,7 +62,7 @@
     public NativeS3OutputStream(
             S3Client s3Client, String bucketName, String key, String localTmpDir)
             throws IOException {
-        this(s3Client, bucketName, key, localTmpDir, null);
+        this(s3Client, bucketName, key, localTmpDir, S3EncryptionConfig.none());
     }
 
     public NativeS3OutputStream(
@@ -71,13 +70,13 @@
             String bucketName,
             String key,
             String localTmpDir,
-            @Nullable S3EncryptionConfig encryptionConfig)
+            S3EncryptionConfig encryptionConfig)
             throws IOException {
         this.s3Client = s3Client;
         this.bucketName = bucketName;
         this.key = key;
         this.encryptionConfig =
-                encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
+                Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");
 
         File tmpDir = new File(localTmpDir);
         if (!tmpDir.exists()) {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 46d089e..8f327ff 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -22,6 +22,8 @@
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.S3ClientBuilder;
 import software.amazon.awssdk.services.s3.S3Configuration;
@@ -54,7 +57,6 @@
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,7 +93,7 @@
     private S3ClientProvider(
             S3Client s3Client,
             S3TransferManager transferManager,
-            @Nullable S3EncryptionConfig encryptionConfig,
+            S3EncryptionConfig encryptionConfig,
             AwsCredentialsProvider credentialsProvider,
             @Nullable StsClient stsClient,
             Duration clientCloseTimeout,
@@ -103,22 +105,24 @@
             boolean checksumValidation,
             int maxConnections,
             int maxRetries) {
-        this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be null");
+        this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client must not be null");
         this.transferManager =
-                Objects.requireNonNull(transferManager, "transferManager must not be null");
+                Preconditions.checkNotNull(transferManager, "transferManager must not be null");
         this.encryptionConfig =
-                encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
+                Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");
         this.credentialsProvider =
-                Objects.requireNonNull(credentialsProvider, "credentialsProvider must not be null");
+                Preconditions.checkNotNull(
+                        credentialsProvider, "credentialsProvider must not be null");
         this.stsClient = stsClient;
         this.clientCloseTimeout =
-                Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout must not be null");
+                Preconditions.checkNotNull(
+                        clientCloseTimeout, "clientCloseTimeout must not be null");
         this.connectionTimeout =
-                Objects.requireNonNull(connectionTimeout, "connectionTimeout must not be null");
+                Preconditions.checkNotNull(connectionTimeout, "connectionTimeout must not be null");
         this.socketTimeout =
-                Objects.requireNonNull(socketTimeout, "socketTimeout must not be null");
+                Preconditions.checkNotNull(socketTimeout, "socketTimeout must not be null");
         this.connectionMaxIdleTime =
-                Objects.requireNonNull(
+                Preconditions.checkNotNull(
                         connectionMaxIdleTime, "connectionMaxIdleTime must not be null");
         this.pathStyleAccess = pathStyleAccess;
         this.chunkedEncoding = chunkedEncoding;
@@ -199,19 +203,15 @@
         }
         return CompletableFuture.runAsync(
                         () -> {
-                            if (transferManager != null) {
-                                try {
-                                    transferManager.close();
-                                } catch (Exception e) {
-                                    LOG.warn("Error closing S3 TransferManager", e);
-                                }
+                            try {
+                                transferManager.close();
+                            } catch (Exception e) {
+                                LOG.warn("Error closing S3 TransferManager", e);
                             }
-                            if (s3Client != null) {
-                                try {
-                                    s3Client.close();
-                                } catch (Exception e) {
-                                    LOG.warn("Error closing S3 sync client", e);
-                                }
+                            try {
+                                s3Client.close();
+                            } catch (Exception e) {
+                                LOG.warn("Error closing S3 sync client", e);
                             }
                             if (getCredentialsProvider() instanceof SdkAutoCloseable) {
                                 try {
@@ -268,7 +268,7 @@
         private int assumeRoleSessionDurationSeconds = 3600;
 
         // Encryption configuration
-        private S3EncryptionConfig encryptionConfig;
+        private S3EncryptionConfig encryptionConfig = S3EncryptionConfig.none();
 
         // Custom credentials provider class names (comma-separated)
         @Nullable private String credentialsProviderClasses;
@@ -348,20 +348,25 @@
             return this;
         }
 
-        public Builder assumeRoleSessionName(@Nullable String assumeRoleSessionName) {
-            if (assumeRoleSessionName != null) {
-                this.assumeRoleSessionName = assumeRoleSessionName;
-            }
+        public Builder assumeRoleSessionName(String assumeRoleSessionName) {
+            this.assumeRoleSessionName =
+                    Preconditions.checkNotNull(
+                            assumeRoleSessionName, "assumeRoleSessionName must not be null");
             return this;
         }
 
         public Builder assumeRoleSessionDurationSeconds(int assumeRoleSessionDurationSeconds) {
+            Preconditions.checkArgument(
+                    assumeRoleSessionDurationSeconds > 0,
+                    "assumeRoleSessionDurationSeconds must be greater than zero");
             this.assumeRoleSessionDurationSeconds = assumeRoleSessionDurationSeconds;
             return this;
         }
 
-        public Builder encryptionConfig(@Nullable S3EncryptionConfig encryptionConfig) {
-            this.encryptionConfig = encryptionConfig;
+        public Builder encryptionConfig(S3EncryptionConfig encryptionConfig) {
+            this.encryptionConfig =
+                    Preconditions.checkNotNull(
+                            encryptionConfig, "encryptionConfig must not be null");
             return this;
         }
 
@@ -370,7 +375,7 @@
             return this;
         }
 
-        public S3ClientProvider build() {
+        S3ClientProvider build() {
             if (endpoint == null) {
                 endpoint = System.getProperty("s3.endpoint");
             }
@@ -386,7 +391,7 @@
             AwsCredentialsProvider credentialsProvider;
 
             AwsCredentialsProvider baseProvider = buildBaseCredentialsProvider();
-            if (assumeRoleArn != null && !assumeRoleArn.isEmpty()) {
+            if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleArn)) {
                 stsClient = buildStsClient(baseProvider, awsRegion);
                 credentialsProvider = buildAssumeRoleProvider(stsClient);
             } else {
@@ -424,24 +429,24 @@
                 clientBuilder.endpointOverride(endpointUri);
             }
             S3Client s3Client = clientBuilder.build();
+
+            S3AsyncClientBuilder asyncClientBuilder =
+                    S3AsyncClient.builder()
+                            .credentialsProvider(credentialsProvider)
+                            .region(awsRegion)
+                            .serviceConfiguration(s3Config)
+                            .httpClientBuilder(
+                                    NettyNioAsyncHttpClient.builder()
+                                            .maxConcurrency(maxConnections)
+                                            .connectionTimeout(connectionTimeout)
+                                            .readTimeout(socketTimeout)
+                                            .connectionAcquisitionTimeout(connectionTimeout))
+                            .overrideConfiguration(overrideConfig);
+            if (endpointUri != null) {
+                asyncClientBuilder.endpointOverride(endpointUri);
+            }
             S3TransferManager transferManager =
-                    S3TransferManager.builder()
-                            .s3Client(
-                                    S3AsyncClient.builder()
-                                            .credentialsProvider(credentialsProvider)
-                                            .region(awsRegion)
-                                            .serviceConfiguration(s3Config)
-                                            .httpClientBuilder(
-                                                    NettyNioAsyncHttpClient.builder()
-                                                            .maxConcurrency(maxConnections)
-                                                            .connectionTimeout(connectionTimeout)
-                                                            .readTimeout(socketTimeout)
-                                                            .connectionAcquisitionTimeout(
-                                                                    connectionTimeout))
-                                            .overrideConfiguration(overrideConfig)
-                                            .endpointOverride(endpointUri)
-                                            .build())
-                            .build();
+                    S3TransferManager.builder().s3Client(asyncClientBuilder.build()).build();
 
             return new S3ClientProvider(
                     s3Client,
@@ -463,8 +468,7 @@
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
             List<AwsCredentialsProvider> chain = new ArrayList<>();
 
-            if (credentialsProviderClasses != null
-                    && !credentialsProviderClasses.trim().isEmpty()) {
+            if (!StringUtils.isNullOrWhitespaceOnly(credentialsProviderClasses)) {
                 for (String name : credentialsProviderClasses.split(",")) {
                     String trimmed = name.trim();
                     if (!trimmed.isEmpty()) {
@@ -547,7 +551,7 @@
                             .roleSessionName(assumeRoleSessionName)
                             .durationSeconds(assumeRoleSessionDurationSeconds);
 
-            if (assumeRoleExternalId != null && !assumeRoleExternalId.isEmpty()) {
+            if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleExternalId)) {
                 requestBuilder.externalId(assumeRoleExternalId);
             }
 
@@ -558,7 +562,7 @@
         }
 
         private Region resolveRegion(@Nullable String explicitRegion) {
-            if (explicitRegion != null && !explicitRegion.isEmpty()) {
+            if (!StringUtils.isNullOrWhitespaceOnly(explicitRegion)) {
                 LOG.info("Using configured AWS region: {}", explicitRegion);
                 return Region.of(explicitRegion);
             }