[rest] Support external paimon table in rest catalog  (#6446)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 03e4719..97434af 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -475,7 +475,8 @@
                 this::loadTableMetadata,
                 lockFactory().orElse(null),
                 lockContext().orElse(null),
-                context);
+                context,
+                false);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index d26642e..e9510e8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -215,7 +215,8 @@
             TableMetadata.Loader metadataLoader,
             @Nullable CatalogLockFactory lockFactory,
             @Nullable CatalogLockContext lockContext,
-            @Nullable CatalogContext catalogContext)
+            @Nullable CatalogContext catalogContext,
+            boolean isRestCatalog)
             throws Catalog.TableNotExistException {
         if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
             return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), catalog);
@@ -256,9 +257,9 @@
                 new CatalogEnvironment(
                         tableIdentifier,
                         metadata.uuid(),
-                        catalog.catalogLoader(),
-                        lockFactory,
-                        lockContext,
+                        isRestCatalog && metadata.isExternal() ? null : catalog.catalogLoader(),
+                        isRestCatalog ? null : lockFactory,
+                        isRestCatalog ? null : lockContext,
                         catalogContext,
                         catalog.supportsVersionManagement());
         Path path = new Path(schema.options().get(PATH.key()));
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 85aa310..0d34e17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -278,7 +278,8 @@
                 this::loadTableMetadata,
                 null,
                 null,
-                context);
+                context,
+                true);
     }
 
     @Override
@@ -427,7 +428,8 @@
                     i -> toTableMetadata(db, response),
                     null,
                     null,
-                    context);
+                    context,
+                    true);
         } catch (TableNotExistException e) {
             throw new RuntimeException(e);
         }
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 6ce5894..51d1218 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -24,7 +24,6 @@
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Database;
-import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
@@ -162,7 +161,7 @@
     private final String databaseUri;
 
     private final CatalogContext catalogContext;
-    private final FileSystemCatalog catalog;
+    private final RESTFileSystemCatalog catalog;
     private final MockWebServer server;
 
     private final Map<String, Database> databaseStore = new HashMap<>();
@@ -202,7 +201,7 @@
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
-        this.catalog = new FileSystemCatalog(fileIO, warehousePath, catalogContext);
+        this.catalog = new RESTFileSystemCatalog(fileIO, warehousePath, catalogContext);
         Dispatcher dispatcher = initDispatcher(authProvider);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index ce37536..5d76946 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
@@ -29,6 +30,7 @@
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.data.serializer.InternalSerializers;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.function.Function;
@@ -44,6 +46,7 @@
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
@@ -2566,6 +2569,185 @@
     @Test
     public void testTableUUID() {}
 
+    @Test
+    public void testCreateExternalTable(@TempDir java.nio.file.Path path) throws Exception {
+        // Create external table with specified location
+        Path externalTablePath = new Path(path.toString(), "external_table_location");
+
+        Map<String, String> options = new HashMap<>();
+        options.put("type", TableType.TABLE.toString());
+        options.put("path", externalTablePath.toString());
+
+        Schema externalTableSchema =
+                new Schema(
+                        Lists.newArrayList(
+                                new DataField(0, "id", DataTypes.INT()),
+                                new DataField(1, "name", DataTypes.STRING()),
+                                new DataField(2, "age", DataTypes.INT())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        "External table for testing");
+
+        // Create database and external table
+        restCatalog.createDatabase("test_external_table_db", true);
+        Identifier identifier = Identifier.create("test_external_table_db", "external_test_table");
+
+        try {
+            catalog.dropTable(identifier, true);
+        } catch (Exception e) {
+            // Ignore drop errors - table might not exist
+        }
+
+        // Pre-create external table directory and schema files (simulating existing external table)
+        createExternalTableDirectory(externalTablePath, externalTableSchema);
+
+        catalog.createTable(identifier, externalTableSchema, false);
+
+        // Verify table exists
+        Table table = catalog.getTable(identifier);
+        assertThat(table).isNotNull();
+
+        // Verify table is external (path should be the specified external path)
+        FileIO fileIO = table.fileIO();
+        assertTrue(fileIO.exists(externalTablePath), "External table path should exist");
+
+        // Verify table metadata
+        assertThat(table.comment()).isEqualTo(Optional.of("External table for testing"));
+        assertThat(table.rowType().getFieldCount()).isEqualTo(3);
+        assertThat(table.rowType().getFieldNames()).containsExactly("id", "name", "age");
+
+        // Test writing data to external table
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        BatchTableCommit commit = writeBuilder.newCommit();
+
+        // Write test data
+        InternalRowSerializer serializer = InternalSerializers.create(table.rowType());
+        InternalRow row1 = GenericRow.of(100, BinaryString.fromString("Alice"), 25);
+        InternalRow row2 = GenericRow.of(200, BinaryString.fromString("Bob"), 30);
+
+        write.write(row1);
+        write.write(row2);
+        List<CommitMessage> commitMessages = write.prepareCommit();
+        commit.commit(commitMessages);
+        write.close();
+        commit.close();
+
+        // Verify data can be read from external table
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableRead read = readBuilder.newRead();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+
+        List<InternalRow> results = new ArrayList<>();
+        for (Split split : splits) {
+            try (RecordReader<InternalRow> reader = read.createReader(split)) {
+                reader.forEachRemaining(results::add);
+            }
+        }
+
+        // Verify we can read data from external table (at least one row)
+        assertThat(results).isNotEmpty();
+
+        // Verify the data structure is correct
+        for (InternalRow row : results) {
+            assertThat(row.getInt(0)).isGreaterThan(0); // id should be positive
+            assertThat(row.getString(1).toString()).isNotEmpty(); // name should not be empty
+            assertThat(row.getInt(2)).isGreaterThan(0); // age should be positive
+        }
+
+        // Test snapshot reading functionality - should read from client side, not server side
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
+
+        // Verify that snapshot manager can read latest snapshot ID from file system
+        Long latestSnapshotId = snapshotManager.latestSnapshotId();
+        assertThat(latestSnapshotId).isNotNull();
+        assertThat(latestSnapshotId).isPositive();
+
+        // Verify that snapshot manager can read the latest snapshot from file system
+        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+        assertThat(latestSnapshot).isNotNull();
+        assertThat(latestSnapshot.id()).isEqualTo(latestSnapshotId);
+
+        // Verify that snapshot manager can read specific snapshot from file system
+        Snapshot specificSnapshot = snapshotManager.snapshot(latestSnapshotId);
+        assertThat(specificSnapshot).isNotNull();
+        assertThat(specificSnapshot.id()).isEqualTo(latestSnapshotId);
+
+        // Verify snapshot contains our committed data
+        assertThat(latestSnapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+        // Test that external table can be listed in catalog
+        List<String> tables = catalog.listTables("test_external_table_db");
+        assertThat(tables).contains("external_test_table");
+
+        // Test that external table can be accessed again after operations
+        Table tableAgain = catalog.getTable(identifier);
+        assertThat(tableAgain).isNotNull();
+        assertThat(tableAgain.comment()).isEqualTo(Optional.of("External table for testing"));
+
+        testReadSystemTables();
+
+        // Verify external table path still exists after operations
+        assertTrue(
+                fileIO.exists(externalTablePath),
+                "External table path should still exist after operations");
+
+        // Test dropping external table - data should remain
+        catalog.dropTable(identifier, false);
+
+        // Verify external table path still exists after drop (external table behavior)
+        assertTrue(
+                fileIO.exists(externalTablePath),
+                "External table path should still exist after drop");
+
+        // Clean up
+        try {
+            fileIO.deleteQuietly(externalTablePath);
+        } catch (Exception e) {
+            // Ignore cleanup errors
+        }
+    }
+
+    private void testReadSystemTables() throws IOException, Catalog.TableNotExistException {
+        Identifier allTablesIdentifier = Identifier.create("sys", "tables");
+        Table allTablesTable = catalog.getTable(allTablesIdentifier);
+
+        if (allTablesTable != null) {
+            ReadBuilder allTablesReadBuilder = allTablesTable.newReadBuilder();
+            TableRead allTablesRead = allTablesReadBuilder.newRead();
+            List<Split> allTablesSplits = allTablesReadBuilder.newScan().plan().splits();
+
+            List<InternalRow> allTablesResults = new ArrayList<>();
+            for (Split split : allTablesSplits) {
+                try (RecordReader<InternalRow> reader = allTablesRead.createReader(split)) {
+                    reader.forEachRemaining(allTablesResults::add);
+                }
+            }
+
+            // Verify that our external table appears in ALL_TABLES
+            assertThat(allTablesResults).isNotEmpty();
+
+            // Find our external table in the results
+            boolean foundExternalTable = false;
+            for (InternalRow row : allTablesResults) {
+                String tableName = row.getString(1).toString(); // table_name column
+                String databaseName = row.getString(0).toString(); // database_name column
+                if ("external_test_table".equals(tableName)
+                        && "test_external_table_db".equals(databaseName)) {
+                    foundExternalTable = true;
+                    // Verify table properties
+                    String tableType = row.getString(2).toString(); // table_type column
+                    assertThat(tableType)
+                            .isEqualTo("table"); // External tables are still MANAGED type
+                    break;
+                }
+            }
+            assertThat(foundExternalTable).isTrue();
+        }
+    }
+
     protected void createTable(
             Identifier identifier, Map<String, String> options, List<String> partitionKeys)
             throws Exception {
@@ -2642,4 +2824,19 @@
         String tokenStr = RESTApi.toJson(token);
         FileUtils.writeStringToFile(tokenFile, tokenStr);
     }
+
+    private void createExternalTableDirectory(Path externalTablePath, Schema schema)
+            throws Exception {
+        // Create external table directory structure
+        FileIO fileIO = FileIO.get(externalTablePath, CatalogContext.create(new Options()));
+
+        // Create the external table directory
+        if (!fileIO.exists(externalTablePath)) {
+            fileIO.mkdirs(externalTablePath);
+        }
+
+        // Create schema file in the external table directory
+        SchemaManager schemaManager = new SchemaManager(fileIO, externalTablePath);
+        schemaManager.createTable(schema, true); // true indicates external table
+    }
 }
diff --git a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
new file mode 100644
index 0000000..26a6315
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+/**
+ * A FileSystemCatalog that supports custom table paths for REST catalog server. This allows REST
+ * catalog to create external tables with specified paths.
+ */
+public class RESTFileSystemCatalog extends FileSystemCatalog {
+
+    public RESTFileSystemCatalog(FileIO fileIO, Path warehouse, CatalogContext context) {
+        super(fileIO, warehouse, context);
+    }
+
+    @Override
+    protected boolean allowCustomTablePath() {
+        return true;
+    }
+}