[Pulsar SQL] Make the Pulsar SQL support query the uppercase topic (#9980)

### Motivation

When querying the uppercase topic (e.g. `public/default/case_UPPER_topic`) by the Pulsar SQL, it will throw the error "topic not found" because of the class `SchemaTableName`, in the Pulsar SQL, we couldn't get the exact table name (e.g. `Test`, `tEst`) but only lowercase table name (e.g. `test`), this is the presto behavior.

Refer to the presto class `SchemaTableName`.
```
@JsonCreator
public SchemaTableName(@JsonProperty("schema") String schemaName, @JsonProperty("table") String tableName) {
    this.schemaName = SchemaUtil.checkNotEmpty(schemaName, "schemaName").toLowerCase(Locale.ENGLISH);
    this.tableName = SchemaUtil.checkNotEmpty(tableName, "tableName").toLowerCase(Locale.ENGLISH);
}
```

If there are topics `public/default/case_UPPER_topic`, `public/default/case_upper_topic`, the query `show tables in pulsar."public/default"` will return one result `public/default/case_upper_topic`, this behavior is determined by the presto spi.

There is a precondition for querying the uppercase topic, the topic name must be unique. For example, if there are topics `public/default/case_UPPER_topic`, `public/default/case_upper_topic`, the query `select * from pulsar."public/default"."case_upper_topic" will throw error no matched topic, because of that we could get the exact topic by the lowercase table name `case_upper_topic`.

If there is a unique topic `public/default/case_UPPER_topic`, the query `select * from pulsar."public/default"."case_UPPER_topic` or `select * from pulsar."public/default"."case_upper_topic` will query that unique topic.

topics | allow query | table name
--|--|--
test, Test | false | -
test | true | test, Test
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 0a7dfb8..5e3b80c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -27,6 +27,9 @@
 import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import io.airlift.log.Logger;
@@ -50,6 +53,7 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import javax.inject.Inject;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -75,9 +79,20 @@
 
     private static final String INFORMATION_SCHEMA = "information_schema";
 
-
     private static final Logger log = Logger.get(PulsarMetadata.class);
 
+    private final LoadingCache<SchemaTableName, TopicName> tableNameTopicNameCache =
+            CacheBuilder.newBuilder()
+                    // use a short live cache to make sure one query not get matched the topic many times and
+                    // prevent get the wrong cache due to the topic changes in the Pulsar.
+                    .expireAfterWrite(30, TimeUnit.SECONDS)
+                    .build(new CacheLoader<SchemaTableName, TopicName>() {
+                        @Override
+                        public TopicName load(SchemaTableName schemaTableName) throws Exception {
+                            return getMatchedPulsarTopic(schemaTableName);
+                        }
+                    });
+
     @Inject
     public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig,
                           PulsarDispatchingRowDecoderFactory decoderFactory) {
@@ -112,11 +127,12 @@
 
     @Override
     public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+        TopicName topicName = getMatchedTopicName(tableName);
         return new PulsarTableHandle(
                 this.connectorId,
                 tableName.getSchemaName(),
                 tableName.getTableName(),
-                tableName.getTableName());
+                topicName.getLocalName());
     }
 
     @Override
@@ -261,40 +277,12 @@
         if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
             return null;
         }
-        String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
 
-        TopicName topicName = TopicName.get(
-                String.format("%s/%s", namespace, schemaTableName.getTableName()));
-
-        List<String> topics;
-        try {
-            if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
-                topics = this.pulsarAdmin.topics().getList(namespace);
-            } else {
-                topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
-            }
-        } catch (PulsarAdminException e) {
-            if (e.getStatusCode() == 404) {
-                throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
-            } else if (e.getStatusCode() == 401) {
-                throw new PrestoException(QUERY_REJECTED,
-                        String.format("Failed to get topics in schema %s: Unauthorized", namespace));
-            }
-            throw new RuntimeException("Failed to get topics in schema " + namespace
-                    + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
-        }
-
-        if (!topics.contains(topicName.toString())) {
-            log.error("Table %s not found",
-                    String.format("%s/%s", namespace,
-                            schemaTableName.getTableName()));
-            throw new TableNotFoundException(schemaTableName);
-        }
+        TopicName topicName = getMatchedTopicName(schemaTableName);
 
         SchemaInfo schemaInfo;
         try {
-            schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
-                    String.format("%s/%s", namespace, schemaTableName.getTableName()));
+            schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(topicName.getSchemaName());
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 404) {
                 // use default schema because there is no schema
@@ -302,12 +290,11 @@
 
             } else if (e.getStatusCode() == 401) {
                 throw new PrestoException(QUERY_REJECTED,
-                        String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
-                                namespace, schemaTableName.getTableName()));
+                        String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized",
+                                topicName));
             } else {
                 throw new RuntimeException("Failed to get pulsar topic schema information for topic "
-                        + String.format("%s/%s", namespace, schemaTableName.getTableName())
-                        + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+                        + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
             }
         }
         List<ColumnMetadata> handles = getPulsarColumns(
@@ -372,4 +359,56 @@
         return builder.build();
     }
 
+    private TopicName getMatchedTopicName(SchemaTableName schemaTableName) {
+        TopicName topicName;
+        try {
+            topicName = tableNameTopicNameCache.get(schemaTableName);
+        } catch (Exception e) {
+            log.error(e, "Failed to get table handler for tableName " + schemaTableName);
+            if (e.getCause() != null && e.getCause() instanceof RuntimeException) {
+                throw (RuntimeException) e.getCause();
+            }
+            throw new TableNotFoundException(schemaTableName);
+        }
+        return topicName;
+    }
+
+    private TopicName getMatchedPulsarTopic(SchemaTableName schemaTableName) {
+        String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
+
+        Set<String> topicsSetWithoutPartition;
+        try {
+            List<String> allTopics = this.pulsarAdmin.topics().getList(namespace);
+            topicsSetWithoutPartition = allTopics.stream()
+                    .map(t -> t.split(TopicName.PARTITIONED_TOPIC_SUFFIX)[0])
+                    .collect(Collectors.toSet());
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() == 404) {
+                throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
+            } else if (e.getStatusCode() == 401) {
+                throw new PrestoException(QUERY_REJECTED,
+                        String.format("Failed to get topics in schema %s: Unauthorized", namespace));
+            }
+            throw new RuntimeException("Failed to get topics in schema " + namespace
+                    + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+        }
+
+        List<String> matchedTopics = topicsSetWithoutPartition.stream()
+                .filter(t -> TopicName.get(t).getLocalName().equalsIgnoreCase(schemaTableName.getTableName()))
+                .collect(Collectors.toList());
+
+        if (matchedTopics.size() == 0) {
+            log.error("Table %s not found", String.format("%s/%s", namespace, schemaTableName.getTableName()));
+            throw new TableNotFoundException(schemaTableName);
+        } else if (matchedTopics.size() != 1) {
+            String errMsg = String.format("There are multiple topics %s matched the table name %s",
+                    matchedTopics.toString(),
+                    String.format("%s/%s", namespace, schemaTableName.getTableName()));
+            log.error(errMsg);
+            throw new TableNotFoundException(schemaTableName, errMsg);
+        }
+        log.info("matched topic %s for table %s ", matchedTopics.get(0), schemaTableName);
+        return TopicName.get(matchedTopics.get(0));
+    }
+
 }
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 4db7eb2..31c54ec 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -102,24 +102,23 @@
         TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
 
         String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
-        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
-                tableHandle.getTableName());
+        TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), tableHandle.getTopicName());
 
         SchemaInfo schemaInfo;
 
         try {
             schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
-                    String.format("%s/%s", namespace, tableHandle.getTableName()));
+                    String.format("%s/%s", namespace, tableHandle.getTopicName()));
         } catch (PulsarAdminException e) {
             if (e.getStatusCode() == 401) {
                 throw new PrestoException(QUERY_REJECTED,
                         String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
-                                namespace, tableHandle.getTableName()));
+                                namespace, tableHandle.getTopicName()));
             } else if (e.getStatusCode() == 404) {
                 schemaInfo = PulsarSqlSchemaInfoProvider.defaultSchema();
             } else {
                 throw new RuntimeException("Failed to get pulsar topic schema for topic "
-                        + String.format("%s/%s", namespace, tableHandle.getTableName())
+                        + String.format("%s/%s", namespace, tableHandle.getTopicName())
                         + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
             }
         }
@@ -251,7 +250,7 @@
                 numSplits,
                 tableHandle,
                 schemaInfo,
-                tableHandle.getTableName(),
+                topicName.getLocalName(),
                 tupleDomain,
                 offloadPolicies);
     }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 23cb0a6..6418cfa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.tests.integration.presto;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import lombok.Cleanup;
@@ -35,6 +36,8 @@
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
@@ -103,6 +106,37 @@
         pulsarSQLBasicTest(TopicName.get(topic), false, false, schema);
     }
 
+    @Test
+    public void testForUppercaseTopic() throws Exception {
+        TopicName topicName = TopicName.get("public/default/case_UPPER_topic_" + randomName(5));
+        pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class));
+    }
+
+    @Test
+    public void testForDifferentCaseTopic() throws Exception {
+        String tableName = "diff_case_topic_" + randomName(5);
+
+        String topic1 = "public/default/" + tableName.toUpperCase();
+        TopicName topicName1 = TopicName.get(topic1);
+        prepareData(topicName1, false, false, JSONSchema.of(Stock.class));
+
+        String topic2 = "public/default/" + tableName;
+        TopicName topicName2 = TopicName.get(topic2);
+        prepareData(topicName2, false, false, JSONSchema.of(Stock.class));
+
+        try {
+            String query = "select * from pulsar.\"public/default\".\"" + tableName + "\"";
+            execQuery(query);
+            Assert.fail("The testForDifferentCaseTopic query [" + query + "] should be failed.");
+        } catch (ContainerExecException e) {
+            log.warn("Expected exception. result stderr: {}", e.getResult().getStderr(), e);
+            assertTrue(e.getResult().getStderr().contains("There are multiple topics"));
+            assertTrue(e.getResult().getStderr().contains(topic1));
+            assertTrue(e.getResult().getStderr().contains(topic2));
+            assertTrue(e.getResult().getStderr().contains("matched the table name public/default/" + tableName));
+        }
+    }
+
     @Override
     protected int prepareData(TopicName topicName,
                               boolean isBatch,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 91186f4..575e57d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -133,7 +133,8 @@
                     ContainerExecResult r = execQuery(
                             String.format("show tables in pulsar.\"%s\";", topicName.getNamespace()));
                     assertThat(r.getExitCode()).isEqualTo(0);
-                    assertThat(r.getStdout()).contains(topicName.getLocalName());
+                    // the show tables query return lowercase table names, so ignore case
+                    assertThat(r.getStdout()).containsIgnoringCase(topicName.getLocalName());
                 }
         );
     }