[Pulsar SQL] Make the Pulsar SQL support query the uppercase topic (#9980)
### Motivation
When querying the uppercase topic (e.g. `public/default/case_UPPER_topic`) by the Pulsar SQL, it will throw the error "topic not found" because of the class `SchemaTableName`, in the Pulsar SQL, we couldn't get the exact table name (e.g. `Test`, `tEst`) but only lowercase table name (e.g. `test`), this is the presto behavior.
Refer to the presto class `SchemaTableName`.
```
@JsonCreator
public SchemaTableName(@JsonProperty("schema") String schemaName, @JsonProperty("table") String tableName) {
this.schemaName = SchemaUtil.checkNotEmpty(schemaName, "schemaName").toLowerCase(Locale.ENGLISH);
this.tableName = SchemaUtil.checkNotEmpty(tableName, "tableName").toLowerCase(Locale.ENGLISH);
}
```
If there are topics `public/default/case_UPPER_topic`, `public/default/case_upper_topic`, the query `show tables in pulsar."public/default"` will return one result `public/default/case_upper_topic`, this behavior is determined by the presto spi.
There is a precondition for querying the uppercase topic, the topic name must be unique. For example, if there are topics `public/default/case_UPPER_topic`, `public/default/case_upper_topic`, the query `select * from pulsar."public/default"."case_upper_topic" will throw error no matched topic, because of that we could get the exact topic by the lowercase table name `case_upper_topic`.
If there is a unique topic `public/default/case_UPPER_topic`, the query `select * from pulsar."public/default"."case_UPPER_topic` or `select * from pulsar."public/default"."case_upper_topic` will query that unique topic.
topics | allow query | table name
--|--|--
test, Test | false | -
test | true | test, Test
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
index 0a7dfb8..5e3b80c 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java
@@ -27,6 +27,9 @@
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
@@ -50,6 +53,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -75,9 +79,20 @@
private static final String INFORMATION_SCHEMA = "information_schema";
-
private static final Logger log = Logger.get(PulsarMetadata.class);
+ private final LoadingCache<SchemaTableName, TopicName> tableNameTopicNameCache =
+ CacheBuilder.newBuilder()
+ // use a short live cache to make sure one query not get matched the topic many times and
+ // prevent get the wrong cache due to the topic changes in the Pulsar.
+ .expireAfterWrite(30, TimeUnit.SECONDS)
+ .build(new CacheLoader<SchemaTableName, TopicName>() {
+ @Override
+ public TopicName load(SchemaTableName schemaTableName) throws Exception {
+ return getMatchedPulsarTopic(schemaTableName);
+ }
+ });
+
@Inject
public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig,
PulsarDispatchingRowDecoderFactory decoderFactory) {
@@ -112,11 +127,12 @@
@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
+ TopicName topicName = getMatchedTopicName(tableName);
return new PulsarTableHandle(
this.connectorId,
tableName.getSchemaName(),
tableName.getTableName(),
- tableName.getTableName());
+ topicName.getLocalName());
}
@Override
@@ -261,40 +277,12 @@
if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
return null;
}
- String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
- TopicName topicName = TopicName.get(
- String.format("%s/%s", namespace, schemaTableName.getTableName()));
-
- List<String> topics;
- try {
- if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
- topics = this.pulsarAdmin.topics().getList(namespace);
- } else {
- topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
- }
- } catch (PulsarAdminException e) {
- if (e.getStatusCode() == 404) {
- throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
- } else if (e.getStatusCode() == 401) {
- throw new PrestoException(QUERY_REJECTED,
- String.format("Failed to get topics in schema %s: Unauthorized", namespace));
- }
- throw new RuntimeException("Failed to get topics in schema " + namespace
- + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
- }
-
- if (!topics.contains(topicName.toString())) {
- log.error("Table %s not found",
- String.format("%s/%s", namespace,
- schemaTableName.getTableName()));
- throw new TableNotFoundException(schemaTableName);
- }
+ TopicName topicName = getMatchedTopicName(schemaTableName);
SchemaInfo schemaInfo;
try {
- schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
- String.format("%s/%s", namespace, schemaTableName.getTableName()));
+ schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(topicName.getSchemaName());
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
// use default schema because there is no schema
@@ -302,12 +290,11 @@
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
- String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
- namespace, schemaTableName.getTableName()));
+ String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized",
+ topicName));
} else {
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
- + String.format("%s/%s", namespace, schemaTableName.getTableName())
- + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+ + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
List<ColumnMetadata> handles = getPulsarColumns(
@@ -372,4 +359,56 @@
return builder.build();
}
+ private TopicName getMatchedTopicName(SchemaTableName schemaTableName) {
+ TopicName topicName;
+ try {
+ topicName = tableNameTopicNameCache.get(schemaTableName);
+ } catch (Exception e) {
+ log.error(e, "Failed to get table handler for tableName " + schemaTableName);
+ if (e.getCause() != null && e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ throw new TableNotFoundException(schemaTableName);
+ }
+ return topicName;
+ }
+
+ private TopicName getMatchedPulsarTopic(SchemaTableName schemaTableName) {
+ String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
+
+ Set<String> topicsSetWithoutPartition;
+ try {
+ List<String> allTopics = this.pulsarAdmin.topics().getList(namespace);
+ topicsSetWithoutPartition = allTopics.stream()
+ .map(t -> t.split(TopicName.PARTITIONED_TOPIC_SUFFIX)[0])
+ .collect(Collectors.toSet());
+ } catch (PulsarAdminException e) {
+ if (e.getStatusCode() == 404) {
+ throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
+ } else if (e.getStatusCode() == 401) {
+ throw new PrestoException(QUERY_REJECTED,
+ String.format("Failed to get topics in schema %s: Unauthorized", namespace));
+ }
+ throw new RuntimeException("Failed to get topics in schema " + namespace
+ + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
+ }
+
+ List<String> matchedTopics = topicsSetWithoutPartition.stream()
+ .filter(t -> TopicName.get(t).getLocalName().equalsIgnoreCase(schemaTableName.getTableName()))
+ .collect(Collectors.toList());
+
+ if (matchedTopics.size() == 0) {
+ log.error("Table %s not found", String.format("%s/%s", namespace, schemaTableName.getTableName()));
+ throw new TableNotFoundException(schemaTableName);
+ } else if (matchedTopics.size() != 1) {
+ String errMsg = String.format("There are multiple topics %s matched the table name %s",
+ matchedTopics.toString(),
+ String.format("%s/%s", namespace, schemaTableName.getTableName()));
+ log.error(errMsg);
+ throw new TableNotFoundException(schemaTableName, errMsg);
+ }
+ log.info("matched topic %s for table %s ", matchedTopics.get(0), schemaTableName);
+ return TopicName.get(matchedTopics.get(0));
+ }
+
}
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
index 4db7eb2..31c54ec 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java
@@ -102,24 +102,23 @@
TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
- TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
- tableHandle.getTableName());
+ TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace), tableHandle.getTopicName());
SchemaInfo schemaInfo;
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
- String.format("%s/%s", namespace, tableHandle.getTableName()));
+ String.format("%s/%s", namespace, tableHandle.getTopicName()));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
- namespace, tableHandle.getTableName()));
+ namespace, tableHandle.getTopicName()));
} else if (e.getStatusCode() == 404) {
schemaInfo = PulsarSqlSchemaInfoProvider.defaultSchema();
} else {
throw new RuntimeException("Failed to get pulsar topic schema for topic "
- + String.format("%s/%s", namespace, tableHandle.getTableName())
+ + String.format("%s/%s", namespace, tableHandle.getTopicName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
@@ -251,7 +250,7 @@
numSplits,
tableHandle,
schemaInfo,
- tableHandle.getTableName(),
+ topicName.getLocalName(),
tupleDomain,
offloadPolicies);
}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
index 23cb0a6..6418cfa 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.presto;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import lombok.Cleanup;
@@ -35,6 +36,8 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.tests.integration.docker.ContainerExecException;
+import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
@@ -103,6 +106,37 @@
pulsarSQLBasicTest(TopicName.get(topic), false, false, schema);
}
+ @Test
+ public void testForUppercaseTopic() throws Exception {
+ TopicName topicName = TopicName.get("public/default/case_UPPER_topic_" + randomName(5));
+ pulsarSQLBasicTest(topicName, false, false, JSONSchema.of(Stock.class));
+ }
+
+ @Test
+ public void testForDifferentCaseTopic() throws Exception {
+ String tableName = "diff_case_topic_" + randomName(5);
+
+ String topic1 = "public/default/" + tableName.toUpperCase();
+ TopicName topicName1 = TopicName.get(topic1);
+ prepareData(topicName1, false, false, JSONSchema.of(Stock.class));
+
+ String topic2 = "public/default/" + tableName;
+ TopicName topicName2 = TopicName.get(topic2);
+ prepareData(topicName2, false, false, JSONSchema.of(Stock.class));
+
+ try {
+ String query = "select * from pulsar.\"public/default\".\"" + tableName + "\"";
+ execQuery(query);
+ Assert.fail("The testForDifferentCaseTopic query [" + query + "] should be failed.");
+ } catch (ContainerExecException e) {
+ log.warn("Expected exception. result stderr: {}", e.getResult().getStderr(), e);
+ assertTrue(e.getResult().getStderr().contains("There are multiple topics"));
+ assertTrue(e.getResult().getStderr().contains(topic1));
+ assertTrue(e.getResult().getStderr().contains(topic2));
+ assertTrue(e.getResult().getStderr().contains("matched the table name public/default/" + tableName));
+ }
+ }
+
@Override
protected int prepareData(TopicName topicName,
boolean isBatch,
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
index 91186f4..575e57d 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java
@@ -133,7 +133,8 @@
ContainerExecResult r = execQuery(
String.format("show tables in pulsar.\"%s\";", topicName.getNamespace()));
assertThat(r.getExitCode()).isEqualTo(0);
- assertThat(r.getStdout()).contains(topicName.getLocalName());
+ // the show tables query return lowercase table names, so ignore case
+ assertThat(r.getStdout()).containsIgnoringCase(topicName.getLocalName());
}
);
}