blob: 5e3b80c638be9a0713aee1d65138835f9236069e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import static io.prestosql.spi.StandardErrorCode.NOT_FOUND;
import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED;
import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.connector.ColumnHandle;
import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.connector.ConnectorMetadata;
import io.prestosql.spi.connector.ConnectorSession;
import io.prestosql.spi.connector.ConnectorTableHandle;
import io.prestosql.spi.connector.ConnectorTableLayout;
import io.prestosql.spi.connector.ConnectorTableLayoutHandle;
import io.prestosql.spi.connector.ConnectorTableLayoutResult;
import io.prestosql.spi.connector.ConnectorTableMetadata;
import io.prestosql.spi.connector.Constraint;
import io.prestosql.spi.connector.SchemaTableName;
import io.prestosql.spi.connector.SchemaTablePrefix;
import io.prestosql.spi.connector.TableNotFoundException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
/**
* This connector helps to work with metadata.
*/
public class PulsarMetadata implements ConnectorMetadata {
private final String connectorId;
private final PulsarAdmin pulsarAdmin;
private final PulsarConnectorConfig pulsarConnectorConfig;
private final PulsarDispatchingRowDecoderFactory decoderFactory;
private static final String INFORMATION_SCHEMA = "information_schema";
private static final Logger log = Logger.get(PulsarMetadata.class);
private final LoadingCache<SchemaTableName, TopicName> tableNameTopicNameCache =
CacheBuilder.newBuilder()
// use a short live cache to make sure one query not get matched the topic many times and
// prevent get the wrong cache due to the topic changes in the Pulsar.
.expireAfterWrite(30, TimeUnit.SECONDS)
.build(new CacheLoader<SchemaTableName, TopicName>() {
@Override
public TopicName load(SchemaTableName schemaTableName) throws Exception {
return getMatchedPulsarTopic(schemaTableName);
}
});
@Inject
public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig,
PulsarDispatchingRowDecoderFactory decoderFactory) {
this.decoderFactory = decoderFactory;
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.pulsarConnectorConfig = pulsarConnectorConfig;
try {
this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}
@Override
public List<String> listSchemaNames(ConnectorSession session) {
List<String> prestoSchemas = new LinkedList<>();
try {
List<String> tenants = pulsarAdmin.tenants().getTenants();
for (String tenant : tenants) {
prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized");
}
throw new RuntimeException("Failed to get schemas from pulsar: "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
return prestoSchemas;
}
@Override
public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) {
TopicName topicName = getMatchedTopicName(tableName);
return new PulsarTableHandle(
this.connectorId,
tableName.getSchemaName(),
tableName.getTableName(),
topicName.getLocalName());
}
@Override
public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table,
Constraint constraint,
Optional<Set<ColumnHandle>> desiredColumns) {
PulsarTableHandle handle = convertTableHandle(table);
ConnectorTableLayout layout = new ConnectorTableLayout(
new PulsarTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}
@Override
public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) {
return new ConnectorTableLayout(handle);
}
@Override
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) {
ConnectorTableMetadata connectorTableMetadata;
SchemaTableName schemaTableName = convertTableHandle(table).toSchemaTableName();
connectorTableMetadata = getTableMetadata(schemaTableName, true);
if (connectorTableMetadata == null) {
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
connectorTableMetadata = new ConnectorTableMetadata(schemaTableName, builder.build());
}
return connectorTableMetadata;
}
@Override
public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName) {
ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder();
if (schemaName.isPresent()) {
String schemaNameOrNull = schemaName.get();
if (schemaNameOrNull.equals(INFORMATION_SCHEMA)) {
// no-op for now but add pulsar connector specific system tables here
} else {
List<String> pulsarTopicList = null;
try {
pulsarTopicList = this.pulsarAdmin.topics()
.getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
return builder.build();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull));
}
throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
if (pulsarTopicList != null) {
pulsarTopicList.stream()
.map(topic -> TopicName.get(topic).getPartitionedTopicName())
.distinct()
.forEach(topic -> builder.add(new SchemaTableName(schemaNameOrNull,
TopicName.get(topic).getLocalName())));
}
}
}
return builder.build();
}
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) {
PulsarTableHandle pulsarTableHandle = convertTableHandle(tableHandle);
ConnectorTableMetadata tableMetaData = getTableMetadata(pulsarTableHandle.toSchemaTableName(), false);
if (tableMetaData == null) {
return new HashMap<>();
}
ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
tableMetaData.getColumns().forEach(columnMetadata -> {
PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata;
PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle(
connectorId,
pulsarColumnMetadata.getNameWithCase(),
pulsarColumnMetadata.getType(),
pulsarColumnMetadata.isHidden(),
pulsarColumnMetadata.isInternal(),
pulsarColumnMetadata.getDecoderExtraInfo().getMapping(),
pulsarColumnMetadata.getDecoderExtraInfo().getDataFormat(),
pulsarColumnMetadata.getDecoderExtraInfo().getFormatHint(),
pulsarColumnMetadata.getHandleKeyValueType());
columnHandles.put(
columnMetadata.getName(),
pulsarColumnHandle);
});
PulsarInternalColumn.getInternalFields().forEach(pulsarInternalColumn -> {
PulsarColumnHandle pulsarColumnHandle = pulsarInternalColumn.getColumnHandle(connectorId, false);
columnHandles.put(pulsarColumnHandle.getName(), pulsarColumnHandle);
});
return columnHandles.build();
}
@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle
columnHandle) {
convertTableHandle(tableHandle);
return convertColumnHandle(columnHandle).getColumnMetadata();
}
@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix
prefix) {
requireNonNull(prefix, "prefix is null");
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
List<SchemaTableName> tableNames;
if (!prefix.getTable().isPresent()) {
tableNames = listTables(session, prefix.getSchema());
} else {
tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchema().get(), prefix.getTable().get()));
}
for (SchemaTableName tableName : tableNames) {
ConnectorTableMetadata connectorTableMetadata = getTableMetadata(tableName, true);
if (connectorTableMetadata != null) {
columns.put(tableName, connectorTableMetadata.getColumns());
}
}
return columns.build();
}
private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) {
if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
return null;
}
TopicName topicName = getMatchedTopicName(schemaTableName);
SchemaInfo schemaInfo;
try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(topicName.getSchemaName());
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
// use default schema because there is no schema
schemaInfo = PulsarSqlSchemaInfoProvider.defaultSchema();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized",
topicName));
} else {
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
List<ColumnMetadata> handles = getPulsarColumns(
topicName, schemaInfo, withInternalColumns, PulsarColumnHandle.HandleKeyValueType.NONE
);
return new ConnectorTableMetadata(schemaTableName, handles);
}
/**
* Convert pulsar schema into presto table metadata.
*/
@VisibleForTesting
public List<ColumnMetadata> getPulsarColumns(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns,
PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
SchemaType schemaType = schemaInfo.getType();
if (schemaType.isStruct() || schemaType.isPrimitive()) {
return getPulsarColumnsFromSchema(topicName, schemaInfo, withInternalColumns, handleKeyValueType);
} else if (schemaType.equals(SchemaType.KEY_VALUE)) {
return getPulsarColumnsFromKeyValueSchema(topicName, schemaInfo, withInternalColumns);
} else {
throw new IllegalArgumentException("Unsupported schema : " + schemaInfo);
}
}
List<ColumnMetadata> getPulsarColumnsFromSchema(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns,
PulsarColumnHandle.HandleKeyValueType handleKeyValueType) {
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
builder.addAll(decoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType));
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields()
.stream()
.forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return builder.build();
}
List<ColumnMetadata> getPulsarColumnsFromKeyValueSchema(TopicName topicName,
SchemaInfo schemaInfo,
boolean withInternalColumns) {
ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder();
KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo);
SchemaInfo keySchemaInfo = kvSchemaInfo.getKey();
List<ColumnMetadata> keyColumnMetadataList = getPulsarColumns(topicName, keySchemaInfo, false,
PulsarColumnHandle.HandleKeyValueType.KEY);
builder.addAll(keyColumnMetadataList);
SchemaInfo valueSchemaInfo = kvSchemaInfo.getValue();
List<ColumnMetadata> valueColumnMetadataList = getPulsarColumns(topicName, valueSchemaInfo, false,
PulsarColumnHandle.HandleKeyValueType.VALUE);
builder.addAll(valueColumnMetadataList);
if (withInternalColumns) {
PulsarInternalColumn.getInternalFields()
.forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false)));
}
return builder.build();
}
private TopicName getMatchedTopicName(SchemaTableName schemaTableName) {
TopicName topicName;
try {
topicName = tableNameTopicNameCache.get(schemaTableName);
} catch (Exception e) {
log.error(e, "Failed to get table handler for tableName " + schemaTableName);
if (e.getCause() != null && e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
}
throw new TableNotFoundException(schemaTableName);
}
return topicName;
}
private TopicName getMatchedPulsarTopic(SchemaTableName schemaTableName) {
String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
Set<String> topicsSetWithoutPartition;
try {
List<String> allTopics = this.pulsarAdmin.topics().getList(namespace);
topicsSetWithoutPartition = allTopics.stream()
.map(t -> t.split(TopicName.PARTITIONED_TOPIC_SUFFIX)[0])
.collect(Collectors.toSet());
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get topics in schema %s: Unauthorized", namespace));
}
throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
List<String> matchedTopics = topicsSetWithoutPartition.stream()
.filter(t -> TopicName.get(t).getLocalName().equalsIgnoreCase(schemaTableName.getTableName()))
.collect(Collectors.toList());
if (matchedTopics.size() == 0) {
log.error("Table %s not found", String.format("%s/%s", namespace, schemaTableName.getTableName()));
throw new TableNotFoundException(schemaTableName);
} else if (matchedTopics.size() != 1) {
String errMsg = String.format("There are multiple topics %s matched the table name %s",
matchedTopics.toString(),
String.format("%s/%s", namespace, schemaTableName.getTableName()));
log.error(errMsg);
throw new TableNotFoundException(schemaTableName, errMsg);
}
log.info("matched topic %s for table %s ", matchedTopics.get(0), schemaTableName);
return TopicName.get(matchedTopics.get(0));
}
}