[rest] Support external paimon table in rest catalog (#6446)
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 03e4719..97434af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -475,7 +475,8 @@
this::loadTableMetadata,
lockFactory().orElse(null),
lockContext().orElse(null),
- context);
+ context,
+ false);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index d26642e..e9510e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -215,7 +215,8 @@
TableMetadata.Loader metadataLoader,
@Nullable CatalogLockFactory lockFactory,
@Nullable CatalogLockContext lockContext,
- @Nullable CatalogContext catalogContext)
+ @Nullable CatalogContext catalogContext,
+ boolean isRestCatalog)
throws Catalog.TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -256,9 +257,9 @@
new CatalogEnvironment(
tableIdentifier,
metadata.uuid(),
- catalog.catalogLoader(),
- lockFactory,
- lockContext,
+ isRestCatalog && metadata.isExternal() ? null : catalog.catalogLoader(),
+ isRestCatalog ? null : lockFactory,
+ isRestCatalog ? null : lockContext,
catalogContext,
catalog.supportsVersionManagement());
Path path = new Path(schema.options().get(PATH.key()));
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 85aa310..0d34e17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -278,7 +278,8 @@
this::loadTableMetadata,
null,
null,
- context);
+ context,
+ true);
}
@Override
@@ -427,7 +428,8 @@
i -> toTableMetadata(db, response),
null,
null,
- context);
+ context,
+ true);
} catch (TableNotExistException e) {
throw new RuntimeException(e);
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 6ce5894..51d1218 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -24,7 +24,6 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Database;
-import org.apache.paimon.catalog.FileSystemCatalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
@@ -162,7 +161,7 @@
private final String databaseUri;
private final CatalogContext catalogContext;
- private final FileSystemCatalog catalog;
+ private final RESTFileSystemCatalog catalog;
private final MockWebServer server;
private final Map<String, Database> databaseStore = new HashMap<>();
@@ -202,7 +201,7 @@
} catch (IOException e) {
throw new UncheckedIOException(e);
}
- this.catalog = new FileSystemCatalog(fileIO, warehousePath, catalogContext);
+ this.catalog = new RESTFileSystemCatalog(fileIO, warehousePath, catalogContext);
Dispatcher dispatcher = initDispatcher(authProvider);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index ce37536..5d76946 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.PropertyChange;
@@ -29,6 +30,7 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
@@ -44,6 +46,7 @@
import org.apache.paimon.rest.responses.ConfigResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableSnapshot;
@@ -2566,6 +2569,185 @@
@Test
public void testTableUUID() {}
+ @Test
+ public void testCreateExternalTable(@TempDir java.nio.file.Path path) throws Exception {
+ // Create external table with specified location
+ Path externalTablePath = new Path(path.toString(), "external_table_location");
+
+ Map<String, String> options = new HashMap<>();
+ options.put("type", TableType.TABLE.toString());
+ options.put("path", externalTablePath.toString());
+
+ Schema externalTableSchema =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "id", DataTypes.INT()),
+ new DataField(1, "name", DataTypes.STRING()),
+ new DataField(2, "age", DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "External table for testing");
+
+ // Create database and external table
+ restCatalog.createDatabase("test_external_table_db", true);
+ Identifier identifier = Identifier.create("test_external_table_db", "external_test_table");
+
+ try {
+ catalog.dropTable(identifier, true);
+ } catch (Exception e) {
+ // Ignore drop errors - table might not exist
+ }
+
+ // Pre-create external table directory and schema files (simulating existing external table)
+ createExternalTableDirectory(externalTablePath, externalTableSchema);
+
+ catalog.createTable(identifier, externalTableSchema, false);
+
+ // Verify table exists
+ Table table = catalog.getTable(identifier);
+ assertThat(table).isNotNull();
+
+ // Verify table is external (path should be the specified external path)
+ FileIO fileIO = table.fileIO();
+ assertTrue(fileIO.exists(externalTablePath), "External table path should exist");
+
+ // Verify table metadata
+ assertThat(table.comment()).isEqualTo(Optional.of("External table for testing"));
+ assertThat(table.rowType().getFieldCount()).isEqualTo(3);
+ assertThat(table.rowType().getFieldNames()).containsExactly("id", "name", "age");
+
+ // Test writing data to external table
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit();
+
+ // Write test data
+ InternalRowSerializer serializer = InternalSerializers.create(table.rowType());
+ InternalRow row1 = GenericRow.of(100, BinaryString.fromString("Alice"), 25);
+ InternalRow row2 = GenericRow.of(200, BinaryString.fromString("Bob"), 30);
+
+ write.write(row1);
+ write.write(row2);
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ commit.commit(commitMessages);
+ write.close();
+ commit.close();
+
+ // Verify data can be read from external table
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableRead read = readBuilder.newRead();
+ List<Split> splits = readBuilder.newScan().plan().splits();
+
+ List<InternalRow> results = new ArrayList<>();
+ for (Split split : splits) {
+ try (RecordReader<InternalRow> reader = read.createReader(split)) {
+ reader.forEachRemaining(results::add);
+ }
+ }
+
+ // Verify we can read data from external table (at least one row)
+ assertThat(results).isNotEmpty();
+
+ // Verify the data structure is correct
+ for (InternalRow row : results) {
+ assertThat(row.getInt(0)).isGreaterThan(0); // id should be positive
+ assertThat(row.getString(1).toString()).isNotEmpty(); // name should not be empty
+ assertThat(row.getInt(2)).isGreaterThan(0); // age should be positive
+ }
+
+ // Test snapshot reading functionality - should read from client side, not server side
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+
+ // Verify that snapshot manager can read latest snapshot ID from file system
+ Long latestSnapshotId = snapshotManager.latestSnapshotId();
+ assertThat(latestSnapshotId).isNotNull();
+ assertThat(latestSnapshotId).isPositive();
+
+ // Verify that snapshot manager can read the latest snapshot from file system
+ Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ assertThat(latestSnapshot).isNotNull();
+ assertThat(latestSnapshot.id()).isEqualTo(latestSnapshotId);
+
+ // Verify that snapshot manager can read specific snapshot from file system
+ Snapshot specificSnapshot = snapshotManager.snapshot(latestSnapshotId);
+ assertThat(specificSnapshot).isNotNull();
+ assertThat(specificSnapshot.id()).isEqualTo(latestSnapshotId);
+
+ // Verify snapshot contains our committed data
+ assertThat(latestSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+ // Test that external table can be listed in catalog
+ List<String> tables = catalog.listTables("test_external_table_db");
+ assertThat(tables).contains("external_test_table");
+
+ // Test that external table can be accessed again after operations
+ Table tableAgain = catalog.getTable(identifier);
+ assertThat(tableAgain).isNotNull();
+ assertThat(tableAgain.comment()).isEqualTo(Optional.of("External table for testing"));
+
+ testReadSystemTables();
+
+ // Verify external table path still exists after operations
+ assertTrue(
+ fileIO.exists(externalTablePath),
+ "External table path should still exist after operations");
+
+ // Test dropping external table - data should remain
+ catalog.dropTable(identifier, false);
+
+ // Verify external table path still exists after drop (external table behavior)
+ assertTrue(
+ fileIO.exists(externalTablePath),
+ "External table path should still exist after drop");
+
+ // Clean up
+ try {
+ fileIO.deleteQuietly(externalTablePath);
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ }
+
+ private void testReadSystemTables() throws IOException, Catalog.TableNotExistException {
+ Identifier allTablesIdentifier = Identifier.create("sys", "tables");
+ Table allTablesTable = catalog.getTable(allTablesIdentifier);
+
+ if (allTablesTable != null) {
+ ReadBuilder allTablesReadBuilder = allTablesTable.newReadBuilder();
+ TableRead allTablesRead = allTablesReadBuilder.newRead();
+ List<Split> allTablesSplits = allTablesReadBuilder.newScan().plan().splits();
+
+ List<InternalRow> allTablesResults = new ArrayList<>();
+ for (Split split : allTablesSplits) {
+ try (RecordReader<InternalRow> reader = allTablesRead.createReader(split)) {
+ reader.forEachRemaining(allTablesResults::add);
+ }
+ }
+
+ // Verify that our external table appears in ALL_TABLES
+ assertThat(allTablesResults).isNotEmpty();
+
+ // Find our external table in the results
+ boolean foundExternalTable = false;
+ for (InternalRow row : allTablesResults) {
+ String tableName = row.getString(1).toString(); // table_name column
+ String databaseName = row.getString(0).toString(); // database_name column
+ if ("external_test_table".equals(tableName)
+ && "test_external_table_db".equals(databaseName)) {
+ foundExternalTable = true;
+ // Verify table properties
+ String tableType = row.getString(2).toString(); // table_type column
+ assertThat(tableType)
+ .isEqualTo("table"); // External tables are still MANAGED type
+ break;
+ }
+ }
+ assertThat(foundExternalTable).isTrue();
+ }
+ }
+
protected void createTable(
Identifier identifier, Map<String, String> options, List<String> partitionKeys)
throws Exception {
@@ -2642,4 +2824,19 @@
String tokenStr = RESTApi.toJson(token);
FileUtils.writeStringToFile(tokenFile, tokenStr);
}
+
+ private void createExternalTableDirectory(Path externalTablePath, Schema schema)
+ throws Exception {
+ // Create external table directory structure
+ FileIO fileIO = FileIO.get(externalTablePath, CatalogContext.create(new Options()));
+
+ // Create the external table directory
+ if (!fileIO.exists(externalTablePath)) {
+ fileIO.mkdirs(externalTablePath);
+ }
+
+ // Create schema file in the external table directory
+ SchemaManager schemaManager = new SchemaManager(fileIO, externalTablePath);
+ schemaManager.createTable(schema, true); // true indicates external table
+ }
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
new file mode 100644
index 0000000..26a6315
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+/**
+ * A FileSystemCatalog that supports custom table paths for REST catalog server. This allows REST
+ * catalog to create external tables with specified paths.
+ */
+public class RESTFileSystemCatalog extends FileSystemCatalog {
+
+ public RESTFileSystemCatalog(FileIO fileIO, Path warehouse, CatalogContext context) {
+ super(fileIO, warehouse, context);
+ }
+
+ @Override
+ protected boolean allowCustomTablePath() {
+ return true;
+ }
+}