[FLINK-34485] Add URI/Configuration constructor to DynamicTemporaryAWSCredentialsProvider
Required such that this provider can be used with the presto S3 filesystem.
diff --git a/docs/content.zh/docs/deployment/filesystems/s3.md b/docs/content.zh/docs/deployment/filesystems/s3.md
index f409de2..12f6637 100644
--- a/docs/content.zh/docs/deployment/filesystems/s3.md
+++ b/docs/content.zh/docs/deployment/filesystems/s3.md
@@ -106,6 +106,15 @@
s3.secret-key: your-secret-key
```
+You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
+
+```yaml
+# flink-s3-fs-hadoop
+fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+# flink-s3-fs-presto
+presto.s3.credential-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+```
+
## 配置非 S3 访问点
S3 文件系统还支持兼容 S3 的对象存储服务,如 [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) 和 [Minio](https://min.io/)。可在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}}) 中配置使用的访问点:
diff --git a/docs/content/docs/deployment/filesystems/s3.md b/docs/content/docs/deployment/filesystems/s3.md
index 1796cbd..389b562 100644
--- a/docs/content/docs/deployment/filesystems/s3.md
+++ b/docs/content/docs/deployment/filesystems/s3.md
@@ -111,6 +111,15 @@
s3.secret-key: your-secret-key
```
+You can limit this configuration to JobManagers by using [Flink configuration file]({{< ref "docs/deployment/security/security-delegation-token" >}}).
+
+```yaml
+# flink-s3-fs-hadoop
+fs.s3a.aws.credentials.provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+# flink-s3-fs-presto
+presto.s3.credential-provider: org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
+```
+
## Configure Non-S3 Endpoint
The S3 Filesystems also support using S3 compliant object stores such as [IBM's Cloud Object Storage](https://www.ibm.com/cloud/object-storage) and [MinIO](https://min.io/).
diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
index 8b04ce1..3639844 100644
--- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
+++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/token/DynamicTemporaryAWSCredentialsProvider.java
@@ -25,10 +25,13 @@
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicSessionCredentials;
import com.amazonaws.services.securitytoken.model.Credentials;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.NoAwsCredentialsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
+
/**
* Support dynamic session credentials for authenticating with AWS. Please note that users may
* reference this class name from configuration property fs.s3a.aws.credentials.provider. Therefore,
@@ -45,6 +48,10 @@
private static final Logger LOG =
LoggerFactory.getLogger(DynamicTemporaryAWSCredentialsProvider.class);
+ public DynamicTemporaryAWSCredentialsProvider() {}
+
+ public DynamicTemporaryAWSCredentialsProvider(URI uri, Configuration conf) {}
+
@Override
public AWSCredentials getCredentials() throws SdkBaseException {
Credentials credentials = AbstractS3DelegationTokenReceiver.getCredentials();
diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
index 52d08d6..c660958 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
+import org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -32,6 +33,7 @@
import java.lang.reflect.Field;
import java.net.URI;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -54,6 +56,21 @@
}
@Test
+ public void testDynamicConfigProvider() throws Exception {
+ final Configuration conf = new Configuration();
+
+ conf.setString(
+ "presto.s3.credentials-provider",
+ DynamicTemporaryAWSCredentialsProvider.class.getName());
+
+ FileSystem.initialize(conf);
+
+ FileSystem fs = FileSystem.get(new URI("s3://test"));
+ assertThat(getAwsCredentialsProvider(getPrestoFileSystem(fs)))
+ .isInstanceOf(DynamicTemporaryAWSCredentialsProvider.class);
+ }
+
+ @Test
public void testConfigPropagationWithPrestoPrefix() throws Exception {
final Configuration conf = new Configuration();
conf.setString("presto.s3.access-key", "test_access_key_id");
@@ -98,15 +115,18 @@
// ------------------------------------------------------------------------
private static void validateBasicCredentials(FileSystem fs) throws Exception {
+ try (PrestoS3FileSystem prestoFs = getPrestoFileSystem(fs)) {
+ AWSCredentialsProvider provider = getAwsCredentialsProvider(prestoFs);
+ assertTrue(provider instanceof AWSStaticCredentialsProvider);
+ }
+ }
+
+ private static PrestoS3FileSystem getPrestoFileSystem(FileSystem fs) {
assertTrue(fs instanceof FlinkS3FileSystem);
org.apache.hadoop.fs.FileSystem hadoopFs = ((FlinkS3FileSystem) fs).getHadoopFileSystem();
assertTrue(hadoopFs instanceof PrestoS3FileSystem);
-
- try (PrestoS3FileSystem prestoFs = (PrestoS3FileSystem) hadoopFs) {
- AWSCredentialsProvider provider = getAwsCredentialsProvider(prestoFs);
- assertTrue(provider instanceof AWSStaticCredentialsProvider);
- }
+ return (PrestoS3FileSystem) hadoopFs;
}
private static AWSCredentialsProvider getAwsCredentialsProvider(PrestoS3FileSystem fs)