Core: Allow passing identity object through RESTSessionCatalog (#7088)
diff --git a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
index 5e0c0e2..157a499 100644
--- a/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
+++ b/api/src/main/java/org/apache/iceberg/catalog/SessionCatalog.java
@@ -38,6 +38,7 @@
private final String identity;
private final Map<String, String> credentials;
private final Map<String, String> properties;
+ private final Object wrappedIdentity;
public static SessionContext createEmpty() {
return new SessionContext(UUID.randomUUID().toString(), null, null, ImmutableMap.of());
@@ -48,10 +49,20 @@
String identity,
Map<String, String> credentials,
Map<String, String> properties) {
+ this(sessionId, identity, credentials, properties, null);
+ }
+
+ public SessionContext(
+ String sessionId,
+ String identity,
+ Map<String, String> credentials,
+ Map<String, String> properties,
+ Object wrappedIdentity) {
this.sessionId = sessionId;
this.identity = identity;
this.credentials = credentials;
this.properties = properties;
+ this.wrappedIdentity = wrappedIdentity;
}
/**
@@ -95,6 +106,15 @@
public Map<String, String> properties() {
return properties;
}
+
+ /**
+ * Returns the opaque wrapped identity object.
+ *
+ * @return the wrapped identity
+ */
+ public Object wrappedIdentity() {
+ return wrappedIdentity;
+ }
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
index 066b62c..e512f26 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
@@ -58,7 +58,7 @@
public RESTCatalog(
SessionCatalog.SessionContext context,
Function<Map<String, String>, RESTClient> clientBuilder) {
- this.sessionCatalog = new RESTSessionCatalog(clientBuilder);
+ this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
}
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
index 25c0d2d..2b36375 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
@@ -32,6 +32,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
@@ -98,7 +99,7 @@
OAuth2Properties.SAML1_TOKEN_TYPE);
private final Function<Map<String, String>, RESTClient> clientBuilder;
- private Function<Map<String, String>, FileIO> ioBuilder = null;
+ private final BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder;
private Cache<String, AuthSession> sessions = null;
private AuthSession catalogAuth = null;
private boolean keepTokenRefreshed = true;
@@ -123,11 +124,15 @@
}
public RESTSessionCatalog() {
- this(config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+ this(config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(), null);
}
- RESTSessionCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
+ public RESTSessionCatalog(
+ Function<Map<String, String>, RESTClient> clientBuilder,
+ BiFunction<SessionContext, Map<String, String>, FileIO> ioBuilder) {
+ Preconditions.checkNotNull(clientBuilder, "Invalid client builder: null");
this.clientBuilder = clientBuilder;
+ this.ioBuilder = ioBuilder;
}
@Override
@@ -188,7 +193,7 @@
client, tokenRefreshExecutor(), token, expiresAtMillis(mergedProps), catalogAuth);
}
- this.io = newFileIO(mergedProps);
+ this.io = newFileIO(SessionContext.createEmpty(), mergedProps);
this.snapshotMode =
SnapshotMode.valueOf(
@@ -203,11 +208,6 @@
super.initialize(name, mergedProps);
}
- public void setFileIOBuilder(Function<Map<String, String>, FileIO> newIOBuilder) {
- Preconditions.checkState(null == io, "Cannot set IO builder after calling initialize");
- this.ioBuilder = newIOBuilder;
- }
-
private AuthSession session(SessionContext context) {
AuthSession session =
sessions.get(
@@ -350,7 +350,7 @@
client,
paths.table(loadedIdent),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
tableMetadata);
TableIdentifier tableIdentifier = loadedIdent;
@@ -605,7 +605,7 @@
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
response.tableMetadata());
return new BaseTable(ops, fullTableName(ident));
@@ -624,7 +624,7 @@
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
meta);
@@ -675,7 +675,7 @@
client,
paths.table(ident),
session::headers,
- tableFileIO(response.config()),
+ tableFileIO(context, response.config()),
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
base);
@@ -765,9 +765,9 @@
return String.format("%s.%s", name(), ident);
}
- private FileIO newFileIO(Map<String, String> properties) {
+ private FileIO newFileIO(SessionContext context, Map<String, String> properties) {
if (null != ioBuilder) {
- return ioBuilder.apply(properties);
+ return ioBuilder.apply(context, properties);
} else {
String ioImpl =
properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, ResolvingFileIO.class.getName());
@@ -775,14 +775,14 @@
}
}
- private FileIO tableFileIO(Map<String, String> config) {
- if (config.isEmpty()) {
+ private FileIO tableFileIO(SessionContext context, Map<String, String> config) {
+ if (config.isEmpty() && ioBuilder == null) {
return io; // reuse client and io since config is the same
}
Map<String, String> fullConf = RESTUtil.merge(properties(), config);
- return newFileIO(fullConf);
+ return newFileIO(context, fullConf);
}
private AuthSession tableSession(Map<String, String> tableConf, AuthSession parent) {