blob: ac9bcd803e00939a319d1633a68bb7cfe93dbe93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.rest;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.CatalogTests;
import org.apache.iceberg.catalog.SessionCatalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.jdbc.JdbcCatalog;
import org.apache.iceberg.metrics.MetricsReport;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.rest.RESTCatalogAdapter.HTTPMethod;
import org.apache.iceberg.rest.RESTSessionCatalog.SnapshotMode;
import org.apache.iceberg.rest.auth.AuthSessionUtil;
import org.apache.iceberg.rest.auth.OAuth2Properties;
import org.apache.iceberg.rest.auth.OAuth2Util;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.Assert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
public class TestRESTCatalog extends CatalogTests<RESTCatalog> {
private static final ObjectMapper MAPPER = RESTObjectMapper.mapper();
@TempDir public Path temp;
private RESTCatalog restCatalog;
private JdbcCatalog backendCatalog;
private Server httpServer;
@BeforeEach
public void createCatalog() throws Exception {
File warehouse = temp.toFile();
Configuration conf = new Configuration();
this.backendCatalog = new JdbcCatalog();
backendCatalog.setConf(conf);
Map<String, String> backendCatalogProperties =
ImmutableMap.of(
CatalogProperties.WAREHOUSE_LOCATION,
warehouse.getAbsolutePath(),
CatalogProperties.URI,
"jdbc:sqlite:file::memory:?ic" + UUID.randomUUID().toString().replace("-", ""),
JdbcCatalog.PROPERTY_PREFIX + "username",
"user",
JdbcCatalog.PROPERTY_PREFIX + "password",
"password");
backendCatalog.initialize("backend", backendCatalogProperties);
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
Map<String, String> contextHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user");
RESTCatalogAdapter adaptor =
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T execute(
RESTCatalogAdapter.HTTPMethod method,
String path,
Map<String, String> queryParams,
Object body,
Class<T> responseType,
Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
// this doesn't use a Mockito spy because this is used for catalog tests, which have
// different method calls
if (!"v1/oauth/tokens".equals(path)) {
if ("v1/config".equals(path)) {
assertThat(headers).containsAllEntriesOf(catalogHeaders);
} else {
assertThat(headers).containsAllEntriesOf(contextHeaders);
}
}
Object request = roundTripSerialize(body, "request");
T response =
super.execute(
method, path, queryParams, request, responseType, headers, errorHandler);
T responseAfterSerialization = roundTripSerialize(response, "response");
return responseAfterSerialization;
}
};
RESTCatalogServlet servlet = new RESTCatalogServlet(adaptor);
ServletContextHandler servletContext =
new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContext.setContextPath("/");
ServletHolder servletHolder = new ServletHolder(servlet);
servletHolder.setInitParameter("javax.ws.rs.Application", "ServiceListPublic");
servletContext.addServlet(servletHolder, "/*");
servletContext.setVirtualHosts(null);
servletContext.setGzipHandler(new GzipHandler());
this.httpServer = new Server(0);
httpServer.setHandler(servletContext);
httpServer.start();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:12345"),
ImmutableMap.of());
this.restCatalog =
new RESTCatalog(
context,
(config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
restCatalog.setConf(conf);
restCatalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI, httpServer.getURI().toString(), "credential", "catalog:12345"));
}
@SuppressWarnings("unchecked")
public static <T> T roundTripSerialize(T payload, String description) {
if (payload != null) {
try {
if (payload instanceof RESTMessage) {
return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), payload.getClass());
} else {
// use Map so that Jackson doesn't try to instantiate ImmutableMap from payload.getClass()
return (T) MAPPER.readValue(MAPPER.writeValueAsString(payload), Map.class);
}
} catch (JsonProcessingException e) {
throw new RuntimeException(
String.format("Failed to serialize and deserialize %s: %s", description, payload), e);
}
}
return null;
}
@AfterEach
public void closeCatalog() throws Exception {
if (restCatalog != null) {
restCatalog.close();
}
if (backendCatalog != null) {
backendCatalog.close();
}
if (httpServer != null) {
httpServer.stop();
httpServer.join();
}
}
@Override
protected RESTCatalog catalog() {
return restCatalog;
}
@Override
protected boolean supportsNamespaceProperties() {
return true;
}
@Override
protected boolean supportsServerSideRetry() {
return true;
}
@Override
protected boolean supportsNestedNamespaces() {
return true;
}
/* RESTCatalog specific tests */
@Test
public void testConfigRoute() throws IOException {
RESTClient testClient =
new RESTCatalogAdapter(backendCatalog) {
@Override
public <T extends RESTResponse> T get(
String path,
Map<String, String> queryParams,
Class<T> responseType,
Map<String, String> headers,
Consumer<ErrorResponse> errorHandler) {
if ("v1/config".equals(path)) {
return castResponse(
responseType,
ConfigResponse.builder()
.withDefaults(ImmutableMap.of(CatalogProperties.CLIENT_POOL_SIZE, "1"))
.withOverrides(
ImmutableMap.of(
CatalogProperties.CACHE_ENABLED,
"false",
CatalogProperties.WAREHOUSE_LOCATION,
queryParams.get(CatalogProperties.WAREHOUSE_LOCATION) + "warehouse"))
.build());
}
return super.get(path, queryParams, responseType, headers, errorHandler);
}
};
RESTCatalog restCat = new RESTCatalog((config) -> testClient);
Map<String, String> initialConfig =
ImmutableMap.of(
CatalogProperties.URI, "http://localhost:8080",
CatalogProperties.CACHE_ENABLED, "true",
CatalogProperties.WAREHOUSE_LOCATION, "s3://bucket/");
restCat.setConf(new Configuration());
restCat.initialize("prod", initialConfig);
Assert.assertEquals(
"Catalog properties after initialize should use the server's override properties",
"false",
restCat.properties().get(CatalogProperties.CACHE_ENABLED));
Assert.assertEquals(
"Catalog after initialize should use the server's default properties if not specified",
"1",
restCat.properties().get(CatalogProperties.CLIENT_POOL_SIZE));
Assert.assertEquals(
"Catalog should return final warehouse location",
"s3://bucket/warehouse",
restCat.properties().get(CatalogProperties.WAREHOUSE_LOCATION));
restCat.close();
}
@Test
public void testInitializeWithBadArguments() throws IOException {
RESTCatalog restCat = new RESTCatalog();
org.assertj.core.api.Assertions.assertThatThrownBy(() -> restCat.initialize("prod", null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Invalid configuration: null");
org.assertj.core.api.Assertions.assertThatThrownBy(
() -> restCat.initialize("prod", ImmutableMap.of()))
.isInstanceOf(NullPointerException.class)
.hasMessage("Invalid uri for http client: null");
restCat.close();
}
@Test
public void testCatalogBasicBearerToken() {
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer bearer-token");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", "bearer-token"));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// the bearer token should be used for all interactions
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(catalogHeaders),
any());
}
@Test
public void testCatalogCredential() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// no token or credential for catalog token exchange
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// no token or credential for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// use the catalog token for all interactions
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(catalogHeaders),
any());
}
@Test
public void testCatalogBearerTokenWithClientCredential() {
Map<String, String> contextHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user");
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer bearer-token");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:secret"),
ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", "bearer-token"));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// use the bearer token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// use the bearer token to fetch the context token
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// use the context token for table load
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(contextHeaders),
any());
}
@Test
public void testCatalogCredentialWithClientCredential() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> contextHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user");
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:secret"),
ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// use the client credential to fetch the context token
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// use the context token for table load
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(contextHeaders),
any());
}
@Test
public void testCatalogBearerTokenAndCredentialWithClientCredential() {
Map<String, String> contextHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user");
Map<String, String> initHeaders = ImmutableMap.of("Authorization", "Bearer bearer-token");
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:secret"),
ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
"credential",
"catalog:secret",
"token",
"bearer-token"));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// use the bearer token for client credentials
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(initHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// use the client credential to fetch the context token
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// use the context token for table load
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(contextHeaders),
any());
}
@Test
public void testClientBearerToken() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"token", "client-bearer-token",
"credential", "user:secret",
"urn:ietf:params:oauth:token-type:id_token", "id-token",
"urn:ietf:params:oauth:token-type:access_token", "access-token",
"urn:ietf:params:oauth:token-type:jwt", "jwt-token",
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of("Authorization", "Bearer client-bearer-token"));
}
@Test
public void testClientCredential() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"credential", "user:secret",
"urn:ietf:params:oauth:token-type:id_token", "id-token",
"urn:ietf:params:oauth:token-type:access_token", "access-token",
"urn:ietf:params:oauth:token-type:jwt", "jwt-token",
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=user"));
}
@Test
public void testClientIDToken() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"urn:ietf:params:oauth:token-type:id_token", "id-token",
"urn:ietf:params:oauth:token-type:access_token", "access-token",
"urn:ietf:params:oauth:token-type:jwt", "jwt-token",
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of(
"Authorization", "Bearer token-exchange-token:sub=id-token,act=bearer-token"));
}
@Test
public void testClientAccessToken() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"urn:ietf:params:oauth:token-type:access_token", "access-token",
"urn:ietf:params:oauth:token-type:jwt", "jwt-token",
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of(
"Authorization", "Bearer token-exchange-token:sub=access-token,act=bearer-token"));
}
@Test
public void testClientJWTToken() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"urn:ietf:params:oauth:token-type:jwt", "jwt-token",
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of(
"Authorization", "Bearer token-exchange-token:sub=jwt-token,act=bearer-token"));
}
@Test
public void testClientSAML2Token() {
testClientAuth(
"bearer-token",
ImmutableMap.of(
"urn:ietf:params:oauth:token-type:saml2", "saml2-token",
"urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of(
"Authorization", "Bearer token-exchange-token:sub=saml2-token,act=bearer-token"));
}
@Test
public void testClientSAML1Token() {
testClientAuth(
"bearer-token",
ImmutableMap.of("urn:ietf:params:oauth:token-type:saml1", "saml1-token"),
ImmutableMap.of(
"Authorization", "Bearer token-exchange-token:sub=saml1-token,act=bearer-token"));
}
private void testClientAuth(
String catalogToken, Map<String, String> credentials, Map<String, String> expectedHeaders) {
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer " + catalogToken);
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// token passes a static token. otherwise, validate a client credentials or token exchange
// request
if (!credentials.containsKey("token")) {
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
}
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedHeaders),
any());
}
@Test
public void testTableBearerToken() {
testTableAuth(
"catalog",
ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
ImmutableMap.of("token", "table-bearer-token"),
ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
ImmutableMap.of("Authorization", "Bearer table-bearer-token"));
}
@Test
public void testTableIDToken() {
testTableAuth(
"catalog",
ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "table-id-token"),
ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
ImmutableMap.of(
"Authorization",
"Bearer token-exchange-token:sub=table-id-token,act=token-exchange-token:sub=id-token,act=catalog"));
}
@Test
public void testTableCredential() {
testTableAuth(
"catalog",
ImmutableMap.of("urn:ietf:params:oauth:token-type:id_token", "id-token"),
ImmutableMap.of("credential", "table-user:secret"),
ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=id-token,act=catalog"),
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=table-user"));
}
@Test
public void testSnapshotParams() {
assertThat(SnapshotMode.ALL.params()).isEqualTo(ImmutableMap.of("snapshots", "all"));
assertThat(SnapshotMode.REFS.params()).isEqualTo(ImmutableMap.of("snapshots", "refs"));
}
@Test
public void testTableSnapshotLoading() {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
// default loading to refs only
"snapshot-loading-mode",
"refs"));
// Create a table with multiple snapshots
Table table = catalog.createTable(TABLE, SCHEMA);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());
// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();
Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());
TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));
return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};
Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());
Table refsTables = catalog.loadTable(TABLE);
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());
// verify that all snapshots are loaded when referenced
assertThat(refsTables.snapshots()).containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());
}
@ParameterizedTest
@ValueSource(strings = {"1", "2"})
public void testTableSnapshotLoadingWithDivergedBranches(String formatVersion) {
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
CatalogProperties.FILE_IO_IMPL,
"org.apache.iceberg.inmemory.InMemoryFileIO",
"snapshot-loading-mode",
"refs"));
Table table =
catalog.createTable(
TABLE,
SCHEMA,
PartitionSpec.unpartitioned(),
ImmutableMap.of("format-version", formatVersion));
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
String branch = "divergedBranch";
table.manageSnapshots().createBranch(branch, table.currentSnapshot().snapshotId()).commit();
// branch and main are diverged now
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-b.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();
ResourcePaths paths = ResourcePaths.forCatalogProperties(Maps.newHashMap());
// Respond with only referenced snapshots
Answer<?> refsAnswer =
invocation -> {
LoadTableResponse originalResponse = (LoadTableResponse) invocation.callRealMethod();
TableMetadata fullTableMetadata = originalResponse.tableMetadata();
Set<Long> referencedSnapshotIds =
fullTableMetadata.refs().values().stream()
.map(SnapshotRef::snapshotId)
.collect(Collectors.toSet());
TableMetadata refsMetadata =
fullTableMetadata.removeSnapshotsIf(
s -> !referencedSnapshotIds.contains(s.snapshotId()));
return LoadTableResponse.builder()
.withTableMetadata(refsMetadata)
.addAllConfig(originalResponse.config())
.build();
};
Mockito.doAnswer(refsAnswer)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());
Table refsTables = catalog.loadTable(TABLE);
assertThat(refsTables.currentSnapshot()).isEqualTo(table.currentSnapshot());
// verify that the table was loaded with the refs argument
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "refs")),
any(),
eq(LoadTableResponse.class),
any(),
any());
// verify that all snapshots are loaded when referenced
assertThat(catalog.loadTable(TABLE).snapshots())
.containsExactlyInAnyOrderElementsOf(table.snapshots());
verify(adapter, times(1))
.execute(
eq(HTTPMethod.GET),
eq(paths.table(TABLE)),
eq(ImmutableMap.of("snapshots", "all")),
any(),
eq(LoadTableResponse.class),
any(),
any());
// verify that committing to branch is possible
catalog
.loadTable(TABLE)
.newAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-c.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.toBranch(branch)
.commit();
assertThat(catalog.loadTable(TABLE).snapshots())
.hasSizeGreaterThan(Lists.newArrayList(table.snapshots()).size());
}
public void testTableAuth(
String catalogToken,
Map<String, String> credentials,
Map<String, String> tableConfig,
Map<String, String> expectedContextHeaders,
Map<String, String> expectedTableHeaders) {
TableIdentifier ident = TableIdentifier.of("ns", "table");
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer " + catalogToken);
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
// inject the expected table config
Answer<LoadTableResponse> addTableConfig =
invocation -> {
LoadTableResponse loadTable = (LoadTableResponse) invocation.callRealMethod();
return LoadTableResponse.builder()
.withTableMetadata(loadTable.tableMetadata())
.addAllConfig(loadTable.config())
.addAllConfig(tableConfig)
.build();
};
Mockito.doAnswer(addTableConfig)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/namespaces/ns/tables"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedContextHeaders),
any());
Mockito.doAnswer(addTableConfig)
.when(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedContextHeaders),
any());
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", credentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", catalogToken));
Schema expectedSchema =
new Schema(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get()));
Table table = catalog.createTable(ident, expectedSchema);
Assertions.assertEquals(
expectedSchema.asStruct(), table.schema().asStruct(), "Schema should match");
Table loaded = catalog.loadTable(ident); // the first load will send the token
Assertions.assertEquals(
expectedSchema.asStruct(), loaded.schema().asStruct(), "Schema should match");
loaded.refresh(); // refresh to force reload
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// session client credentials flow
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// create table request
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/namespaces/ns/tables"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedContextHeaders),
any());
// if the table returned a bearer token, there will be no token request
if (!tableConfig.containsKey("token")) {
// client credentials or token exchange to get a table token
Mockito.verify(adapter, times(2))
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(expectedContextHeaders),
any());
}
// automatic refresh when metadata is accessed after commit
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedTableHeaders),
any());
// load table from catalog
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedContextHeaders),
any());
// refresh loaded table
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(expectedTableHeaders),
any());
}
@Test
public void testCatalogTokenRefresh() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};
Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
any(),
any());
Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// verify the first token exchange
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", "client-credentials-token:sub=catalog",
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// verify that a second exchange occurs
Map<String, String> secondRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token",
"token-exchange-token:sub=client-credentials-token:sub=catalog",
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
Map<String, String> secondRefreshHeaders =
ImmutableMap.of(
"Authorization",
"Bearer token-exchange-token:sub=client-credentials-token:sub=catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(secondRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(secondRefreshHeaders),
any());
});
}
@Test
public void testCatalogRefreshedTokenIsUsed() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};
Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
any(),
any());
Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", "catalog:secret"));
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// use the exchanged catalog token
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// verify the first token exchange
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", "client-credentials-token:sub=catalog",
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// use the refreshed context token for table load
Map<String, String> refreshedCatalogHeader =
ImmutableMap.of(
"Authorization",
"Bearer token-exchange-token:sub=client-credentials-token:sub=catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(refreshedCatalogHeader),
any());
});
}
@Test
public void testCatalogWithCustomMetricsReporter() throws IOException {
this.restCatalog =
new RESTCatalog(
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(),
"user",
ImmutableMap.of("credential", "user:12345"),
ImmutableMap.of()),
(config) -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
restCatalog.setConf(new Configuration());
restCatalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
httpServer.getURI().toString(),
"credential",
"catalog:12345",
CatalogProperties.METRICS_REPORTER_IMPL,
CustomMetricsReporter.class.getName()));
restCatalog.buildTable(TABLE, SCHEMA).create();
Table table = restCatalog.loadTable(TABLE);
table
.newFastAppend()
.appendFile(
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(2)
.build())
.commit();
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
assertThat(tasks.iterator()).hasNext();
}
// counter of custom metrics reporter should have been increased
// 1x for commit metrics / 1x for scan metrics
assertThat(CustomMetricsReporter.COUNTER.get()).isEqualTo(2);
}
public static class CustomMetricsReporter implements MetricsReporter {
static final AtomicInteger COUNTER = new AtomicInteger(0);
@Override
public void report(MetricsReport report) {
COUNTER.incrementAndGet();
}
}
@Test
public void testCatalogExpiredBearerTokenRefreshWithoutCredential() {
// expires at epoch second = 1
String token =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjF9.gQADTbdEv-rpDWKSkGLbmafyB5UUjTdm9B_1izpuZ6E";
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Map<String, String> contextCredentials = ImmutableMap.of("token", token);
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", token));
}
@Test
public void testCatalogExpiredBearerTokenIsRefreshedWithCredential() {
// expires at epoch second = 1
String token =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjF9.gQADTbdEv-rpDWKSkGLbmafyB5UUjTdm9B_1izpuZ6E";
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
String credential = "catalog:12345";
Map<String, String> contextCredentials = ImmutableMap.of("token", token);
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
// the init token at the catalog level is a valid token
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", credential));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", token,
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(OAuth2Util.basicAuthHeaders(credential)),
any());
// verify that a second exchange occurs
Map<String, String> secondRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", token,
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(secondRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(OAuth2Util.basicAuthHeaders(credential)),
any());
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(ImmutableMap.of("Authorization", "Bearer token-exchange-token:sub=" + token)),
any());
}
@Test
public void testCatalogValidBearerTokenIsNotRefreshed() {
// expires at epoch second = 19999999999
String token =
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE5OTk5OTk5OTk5fQ._3k92KJi2NTyTG6V1s2mzJ__GiQtL36DnzsZSkBdYPw";
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer " + token);
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
String credential = "catalog:12345";
Map<String, String> contextCredentials =
ImmutableMap.of("token", token, "credential", credential);
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize("prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "token", token));
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(OAuth2Util.authHeaders(token)),
any());
}
@Test
public void testCatalogTokenRefreshFailsAndUsesCredentialForRefresh() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
String credential = "catalog:secret";
Map<String, String> basicHeaders = OAuth2Util.basicAuthHeaders(credential);
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};
Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
any(),
any());
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", "client-credentials-token:sub=catalog",
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", "catalog");
// simulate that the token expired when it was about to be refreshed
Mockito.doThrow(new RuntimeException("token expired"))
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
// lower retries
AuthSessionUtil.setTokenRefreshNumRetries(1);
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod", ImmutableMap.of(CatalogProperties.URI, "ignored", "credential", credential));
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// use the exchanged catalog token
Assertions.assertFalse(catalog.tableExists(TableIdentifier.of("ns", "table")));
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// verify the first token exchange - since an exception is thrown, we're performing
// retries
Mockito.verify(adapter, times(2))
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
// here we make sure that the basic auth header is used after token refresh retries
// failed
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(basicHeaders),
any());
// use the refreshed context token for table load
Map<String, String> refreshedCatalogHeader =
ImmutableMap.of(
"Authorization",
"Bearer token-exchange-token:sub=client-credentials-token:sub=catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/namespaces/ns/tables/table"),
any(),
any(),
eq(LoadTableResponse.class),
eq(refreshedCatalogHeader),
any());
});
}
@Test
public void testCatalogWithCustomTokenScope() {
Map<String, String> emptyHeaders = ImmutableMap.of();
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};
Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
any(),
any());
Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
String scope = "custom_catalog_scope";
catalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
"credential",
"catalog:secret",
OAuth2Properties.SCOPE,
scope));
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// call client credentials with no initial auth
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
eq(emptyHeaders),
any());
// use the client credential token for config
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
// verify the token exchange uses the right scope
Map<String, String> firstRefreshRequest =
ImmutableMap.of(
"grant_type", "urn:ietf:params:oauth:grant-type:token-exchange",
"subject_token", "client-credentials-token:sub=catalog",
"subject_token_type", "urn:ietf:params:oauth:token-type:access_token",
"scope", scope);
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(firstRefreshRequest::equals),
eq(OAuthTokenResponse.class),
eq(catalogHeaders),
any());
});
}
public void testCatalogTokenRefreshDisabledWithToken() {
String token = "some-token";
Map<String, String> catalogHeaders = ImmutableMap.of("Authorization", "Bearer " + token);
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
Answer<OAuthTokenResponse> addOneSecondExpiration =
invocation -> {
OAuthTokenResponse response = (OAuthTokenResponse) invocation.callRealMethod();
return OAuthTokenResponse.builder()
.withToken(response.token())
.withTokenType(response.tokenType())
.withIssuedTokenType(response.issuedTokenType())
.addScopes(response.scopes())
.setExpirationInSeconds(1)
.build();
};
Mockito.doAnswer(addOneSecondExpiration)
.when(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
any(),
eq(OAuthTokenResponse.class),
any(),
any());
Map<String, String> contextCredentials = ImmutableMap.of();
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", contextCredentials, ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
OAuth2Properties.TOKEN,
token,
OAuth2Properties.TOKEN_REFRESH_ENABLED,
"false"));
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
}
@Test
public void testCatalogTokenRefreshDisabledWithCredential() {
Map<String, String> catalogHeaders =
ImmutableMap.of("Authorization", "Bearer client-credentials-token:sub=catalog");
RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog));
SessionCatalog.SessionContext context =
new SessionCatalog.SessionContext(
UUID.randomUUID().toString(), "user", ImmutableMap.of(), ImmutableMap.of());
RESTCatalog catalog = new RESTCatalog(context, (config) -> adapter);
catalog.initialize(
"prod",
ImmutableMap.of(
CatalogProperties.URI,
"ignored",
OAuth2Properties.CREDENTIAL,
"catalog:12345",
OAuth2Properties.TOKEN_REFRESH_ENABLED,
"false"));
// fetch token from client credential
Map<String, String> fetchTokenFromCredential =
ImmutableMap.of(
"grant_type",
"client_credentials",
"client_id",
"catalog",
"client_secret",
"12345",
"scope",
"catalog");
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.POST),
eq("v1/oauth/tokens"),
any(),
Mockito.argThat(fetchTokenFromCredential::equals),
eq(OAuthTokenResponse.class),
eq(ImmutableMap.of()),
any());
Mockito.verify(adapter)
.execute(
eq(HTTPMethod.GET),
eq("v1/config"),
any(),
any(),
eq(ConfigResponse.class),
eq(catalogHeaders),
any());
}
}