[FLINK-39607][s3] Add missing Nullable annotation and precondition for config
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index cb5026d..0ec9fb2 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -31,6 +31,7 @@
import org.apache.flink.fs.s3native.writer.NativeS3ObjectOperations;
import org.apache.flink.fs.s3native.writer.NativeS3RecoverableWriter;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -127,7 +128,8 @@
boolean useAsyncOperations,
int readBufferSize,
Duration fsCloseTimeout) {
- this.clientProvider = clientProvider;
+ this.clientProvider =
+ Preconditions.checkNotNull(clientProvider, "clientProvider must not be null");
this.uri = uri;
this.bucketName = uri.getHost();
this.entropyInjectionKey = entropyInjectionKey;
@@ -550,9 +552,8 @@
"Native S3 FileSystem closed for bucket: {}",
bucketName))
.thenCompose(
- ignored -> {
- if (clientProvider != null) {
- return clientProvider
+ ignored ->
+ clientProvider
.closeAsync()
.whenComplete(
(result, error) -> {
@@ -564,10 +565,7 @@
LOG.debug(
"S3 client provider closed");
}
- });
- }
- return CompletableFuture.completedFuture(null);
- })
+ }))
.orTimeout(fsCloseTimeout.toSeconds(), TimeUnit.SECONDS)
.whenComplete(
(result, error) -> {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 2e1062f..c54ed86 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -31,6 +31,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
@@ -292,7 +294,7 @@
+ "When not set, the default chain is used: delegation tokens -> "
+ "static credentials (if configured) -> DefaultCredentialsProvider.");
- private Configuration flinkConfig;
+ @Nullable private Configuration flinkConfig;
@Override
public String getScheme() {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
index ac3562f..d9a123d 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3OutputStream.java
@@ -19,13 +19,12 @@
package org.apache.flink.fs.s3native;
import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.util.Preconditions;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
-import javax.annotation.Nullable;
-
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
@@ -63,7 +62,7 @@
public NativeS3OutputStream(
S3Client s3Client, String bucketName, String key, String localTmpDir)
throws IOException {
- this(s3Client, bucketName, key, localTmpDir, null);
+ this(s3Client, bucketName, key, localTmpDir, S3EncryptionConfig.none());
}
public NativeS3OutputStream(
@@ -71,13 +70,13 @@
String bucketName,
String key,
String localTmpDir,
- @Nullable S3EncryptionConfig encryptionConfig)
+ S3EncryptionConfig encryptionConfig)
throws IOException {
this.s3Client = s3Client;
this.bucketName = bucketName;
this.key = key;
this.encryptionConfig =
- encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
+ Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");
File tmpDir = new File(localTmpDir);
if (!tmpDir.exists()) {
diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 46d089e..8f327ff 100644
--- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -22,6 +22,8 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3native.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,6 +39,7 @@
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
@@ -54,7 +57,6 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,7 +93,7 @@
private S3ClientProvider(
S3Client s3Client,
S3TransferManager transferManager,
- @Nullable S3EncryptionConfig encryptionConfig,
+ S3EncryptionConfig encryptionConfig,
AwsCredentialsProvider credentialsProvider,
@Nullable StsClient stsClient,
Duration clientCloseTimeout,
@@ -103,22 +105,24 @@
boolean checksumValidation,
int maxConnections,
int maxRetries) {
- this.s3Client = Objects.requireNonNull(s3Client, "s3Client must not be null");
+ this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client must not be null");
this.transferManager =
- Objects.requireNonNull(transferManager, "transferManager must not be null");
+ Preconditions.checkNotNull(transferManager, "transferManager must not be null");
this.encryptionConfig =
- encryptionConfig != null ? encryptionConfig : S3EncryptionConfig.none();
+ Preconditions.checkNotNull(encryptionConfig, "encryptionConfig must not be null");
this.credentialsProvider =
- Objects.requireNonNull(credentialsProvider, "credentialsProvider must not be null");
+ Preconditions.checkNotNull(
+ credentialsProvider, "credentialsProvider must not be null");
this.stsClient = stsClient;
this.clientCloseTimeout =
- Objects.requireNonNull(clientCloseTimeout, "clientCloseTimeout must not be null");
+ Preconditions.checkNotNull(
+ clientCloseTimeout, "clientCloseTimeout must not be null");
this.connectionTimeout =
- Objects.requireNonNull(connectionTimeout, "connectionTimeout must not be null");
+ Preconditions.checkNotNull(connectionTimeout, "connectionTimeout must not be null");
this.socketTimeout =
- Objects.requireNonNull(socketTimeout, "socketTimeout must not be null");
+ Preconditions.checkNotNull(socketTimeout, "socketTimeout must not be null");
this.connectionMaxIdleTime =
- Objects.requireNonNull(
+ Preconditions.checkNotNull(
connectionMaxIdleTime, "connectionMaxIdleTime must not be null");
this.pathStyleAccess = pathStyleAccess;
this.chunkedEncoding = chunkedEncoding;
@@ -199,19 +203,15 @@
}
return CompletableFuture.runAsync(
() -> {
- if (transferManager != null) {
- try {
- transferManager.close();
- } catch (Exception e) {
- LOG.warn("Error closing S3 TransferManager", e);
- }
+ try {
+ transferManager.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing S3 TransferManager", e);
}
- if (s3Client != null) {
- try {
- s3Client.close();
- } catch (Exception e) {
- LOG.warn("Error closing S3 sync client", e);
- }
+ try {
+ s3Client.close();
+ } catch (Exception e) {
+ LOG.warn("Error closing S3 sync client", e);
}
if (getCredentialsProvider() instanceof SdkAutoCloseable) {
try {
@@ -268,7 +268,7 @@
private int assumeRoleSessionDurationSeconds = 3600;
// Encryption configuration
- private S3EncryptionConfig encryptionConfig;
+ private S3EncryptionConfig encryptionConfig = S3EncryptionConfig.none();
// Custom credentials provider class names (comma-separated)
@Nullable private String credentialsProviderClasses;
@@ -348,20 +348,25 @@
return this;
}
- public Builder assumeRoleSessionName(@Nullable String assumeRoleSessionName) {
- if (assumeRoleSessionName != null) {
- this.assumeRoleSessionName = assumeRoleSessionName;
- }
+ public Builder assumeRoleSessionName(String assumeRoleSessionName) {
+ this.assumeRoleSessionName =
+ Preconditions.checkNotNull(
+ assumeRoleSessionName, "assumeRoleSessionName must not be null");
return this;
}
public Builder assumeRoleSessionDurationSeconds(int assumeRoleSessionDurationSeconds) {
+ Preconditions.checkArgument(
+ assumeRoleSessionDurationSeconds > 0,
+ "assumeRoleSessionDurationSeconds must be greater than zero");
this.assumeRoleSessionDurationSeconds = assumeRoleSessionDurationSeconds;
return this;
}
- public Builder encryptionConfig(@Nullable S3EncryptionConfig encryptionConfig) {
- this.encryptionConfig = encryptionConfig;
+ public Builder encryptionConfig(S3EncryptionConfig encryptionConfig) {
+ this.encryptionConfig =
+ Preconditions.checkNotNull(
+ encryptionConfig, "encryptionConfig must not be null");
return this;
}
@@ -370,7 +375,7 @@
return this;
}
- public S3ClientProvider build() {
+ S3ClientProvider build() {
if (endpoint == null) {
endpoint = System.getProperty("s3.endpoint");
}
@@ -386,7 +391,7 @@
AwsCredentialsProvider credentialsProvider;
AwsCredentialsProvider baseProvider = buildBaseCredentialsProvider();
- if (assumeRoleArn != null && !assumeRoleArn.isEmpty()) {
+ if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleArn)) {
stsClient = buildStsClient(baseProvider, awsRegion);
credentialsProvider = buildAssumeRoleProvider(stsClient);
} else {
@@ -424,24 +429,24 @@
clientBuilder.endpointOverride(endpointUri);
}
S3Client s3Client = clientBuilder.build();
+
+ S3AsyncClientBuilder asyncClientBuilder =
+ S3AsyncClient.builder()
+ .credentialsProvider(credentialsProvider)
+ .region(awsRegion)
+ .serviceConfiguration(s3Config)
+ .httpClientBuilder(
+ NettyNioAsyncHttpClient.builder()
+ .maxConcurrency(maxConnections)
+ .connectionTimeout(connectionTimeout)
+ .readTimeout(socketTimeout)
+ .connectionAcquisitionTimeout(connectionTimeout))
+ .overrideConfiguration(overrideConfig);
+ if (endpointUri != null) {
+ asyncClientBuilder.endpointOverride(endpointUri);
+ }
S3TransferManager transferManager =
- S3TransferManager.builder()
- .s3Client(
- S3AsyncClient.builder()
- .credentialsProvider(credentialsProvider)
- .region(awsRegion)
- .serviceConfiguration(s3Config)
- .httpClientBuilder(
- NettyNioAsyncHttpClient.builder()
- .maxConcurrency(maxConnections)
- .connectionTimeout(connectionTimeout)
- .readTimeout(socketTimeout)
- .connectionAcquisitionTimeout(
- connectionTimeout))
- .overrideConfiguration(overrideConfig)
- .endpointOverride(endpointUri)
- .build())
- .build();
+ S3TransferManager.builder().s3Client(asyncClientBuilder.build()).build();
return new S3ClientProvider(
s3Client,
@@ -463,8 +468,7 @@
private AwsCredentialsProvider buildBaseCredentialsProvider() {
List<AwsCredentialsProvider> chain = new ArrayList<>();
- if (credentialsProviderClasses != null
- && !credentialsProviderClasses.trim().isEmpty()) {
+ if (!StringUtils.isNullOrWhitespaceOnly(credentialsProviderClasses)) {
for (String name : credentialsProviderClasses.split(",")) {
String trimmed = name.trim();
if (!trimmed.isEmpty()) {
@@ -547,7 +551,7 @@
.roleSessionName(assumeRoleSessionName)
.durationSeconds(assumeRoleSessionDurationSeconds);
- if (assumeRoleExternalId != null && !assumeRoleExternalId.isEmpty()) {
+ if (!StringUtils.isNullOrWhitespaceOnly(assumeRoleExternalId)) {
requestBuilder.externalId(assumeRoleExternalId);
}
@@ -558,7 +562,7 @@
}
private Region resolveRegion(@Nullable String explicitRegion) {
- if (explicitRegion != null && !explicitRegion.isEmpty()) {
+ if (!StringUtils.isNullOrWhitespaceOnly(explicitRegion)) {
LOG.info("Using configured AWS region: {}", explicitRegion);
return Region.of(explicitRegion);
}