| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.sql.presto; |
| |
| import static io.prestosql.spi.StandardErrorCode.NOT_FOUND; |
| import static io.prestosql.spi.StandardErrorCode.QUERY_REJECTED; |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; |
| import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded; |
| import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle; |
| import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.CacheLoader; |
| import com.google.common.cache.LoadingCache; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import io.airlift.log.Logger; |
| import io.prestosql.spi.PrestoException; |
| import io.prestosql.spi.connector.ColumnHandle; |
| import io.prestosql.spi.connector.ColumnMetadata; |
| import io.prestosql.spi.connector.ConnectorMetadata; |
| import io.prestosql.spi.connector.ConnectorSession; |
| import io.prestosql.spi.connector.ConnectorTableHandle; |
| import io.prestosql.spi.connector.ConnectorTableLayout; |
| import io.prestosql.spi.connector.ConnectorTableLayoutHandle; |
| import io.prestosql.spi.connector.ConnectorTableLayoutResult; |
| import io.prestosql.spi.connector.ConnectorTableMetadata; |
| import io.prestosql.spi.connector.Constraint; |
| import io.prestosql.spi.connector.SchemaTableName; |
| import io.prestosql.spi.connector.SchemaTablePrefix; |
| import io.prestosql.spi.connector.TableNotFoundException; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| import javax.inject.Inject; |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminException; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.impl.schema.KeyValueSchemaInfo; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.schema.KeyValue; |
| import org.apache.pulsar.common.schema.SchemaInfo; |
| import org.apache.pulsar.common.schema.SchemaType; |
| |
| /** |
| * This connector helps to work with metadata. |
| */ |
| public class PulsarMetadata implements ConnectorMetadata { |
| |
| private final String connectorId; |
| private final PulsarAdmin pulsarAdmin; |
| private final PulsarConnectorConfig pulsarConnectorConfig; |
| |
| private final PulsarDispatchingRowDecoderFactory decoderFactory; |
| |
| private static final String INFORMATION_SCHEMA = "information_schema"; |
| |
| private static final Logger log = Logger.get(PulsarMetadata.class); |
| |
| private final LoadingCache<SchemaTableName, TopicName> tableNameTopicNameCache = |
| CacheBuilder.newBuilder() |
| // use a short live cache to make sure one query not get matched the topic many times and |
| // prevent get the wrong cache due to the topic changes in the Pulsar. |
| .expireAfterWrite(30, TimeUnit.SECONDS) |
| .build(new CacheLoader<SchemaTableName, TopicName>() { |
| @Override |
| public TopicName load(SchemaTableName schemaTableName) throws Exception { |
| return getMatchedPulsarTopic(schemaTableName); |
| } |
| }); |
| |
| @Inject |
| public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig, |
| PulsarDispatchingRowDecoderFactory decoderFactory) { |
| this.decoderFactory = decoderFactory; |
| this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); |
| this.pulsarConnectorConfig = pulsarConnectorConfig; |
| try { |
| this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin(); |
| } catch (PulsarClientException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public List<String> listSchemaNames(ConnectorSession session) { |
| List<String> prestoSchemas = new LinkedList<>(); |
| try { |
| List<String> tenants = pulsarAdmin.tenants().getTenants(); |
| for (String tenant : tenants) { |
| prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace -> |
| rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList())); |
| } |
| } catch (PulsarAdminException e) { |
| if (e.getStatusCode() == 401) { |
| throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized"); |
| } |
| throw new RuntimeException("Failed to get schemas from pulsar: " |
| + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); |
| } |
| return prestoSchemas; |
| } |
| |
| @Override |
| public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) { |
| TopicName topicName = getMatchedTopicName(tableName); |
| return new PulsarTableHandle( |
| this.connectorId, |
| tableName.getSchemaName(), |
| tableName.getTableName(), |
| topicName.getLocalName()); |
| } |
| |
| @Override |
| public List<ConnectorTableLayoutResult> getTableLayouts(ConnectorSession session, ConnectorTableHandle table, |
| Constraint constraint, |
| Optional<Set<ColumnHandle>> desiredColumns) { |
| |
| PulsarTableHandle handle = convertTableHandle(table); |
| ConnectorTableLayout layout = new ConnectorTableLayout( |
| new PulsarTableLayoutHandle(handle, constraint.getSummary())); |
| return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); |
| } |
| |
| @Override |
| public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) { |
| return new ConnectorTableLayout(handle); |
| } |
| |
| @Override |
| public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) { |
| ConnectorTableMetadata connectorTableMetadata; |
| SchemaTableName schemaTableName = convertTableHandle(table).toSchemaTableName(); |
| connectorTableMetadata = getTableMetadata(schemaTableName, true); |
| if (connectorTableMetadata == null) { |
| ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder(); |
| connectorTableMetadata = new ConnectorTableMetadata(schemaTableName, builder.build()); |
| } |
| return connectorTableMetadata; |
| } |
| |
| @Override |
| public List<SchemaTableName> listTables(ConnectorSession session, Optional<String> schemaName) { |
| ImmutableList.Builder<SchemaTableName> builder = ImmutableList.builder(); |
| |
| if (schemaName.isPresent()) { |
| String schemaNameOrNull = schemaName.get(); |
| |
| if (schemaNameOrNull.equals(INFORMATION_SCHEMA)) { |
| // no-op for now but add pulsar connector specific system tables here |
| } else { |
| List<String> pulsarTopicList = null; |
| try { |
| pulsarTopicList = this.pulsarAdmin.topics() |
| .getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig)); |
| } catch (PulsarAdminException e) { |
| if (e.getStatusCode() == 404) { |
| log.warn("Schema " + schemaNameOrNull + " does not exsit"); |
| return builder.build(); |
| } else if (e.getStatusCode() == 401) { |
| throw new PrestoException(QUERY_REJECTED, |
| String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull)); |
| } |
| throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": " |
| + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); |
| } |
| if (pulsarTopicList != null) { |
| pulsarTopicList.stream() |
| .map(topic -> TopicName.get(topic).getPartitionedTopicName()) |
| .distinct() |
| .forEach(topic -> builder.add(new SchemaTableName(schemaNameOrNull, |
| TopicName.get(topic).getLocalName()))); |
| } |
| } |
| } |
| return builder.build(); |
| } |
| |
| @Override |
| public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) { |
| PulsarTableHandle pulsarTableHandle = convertTableHandle(tableHandle); |
| |
| ConnectorTableMetadata tableMetaData = getTableMetadata(pulsarTableHandle.toSchemaTableName(), false); |
| if (tableMetaData == null) { |
| return new HashMap<>(); |
| } |
| |
| ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder(); |
| |
| tableMetaData.getColumns().forEach(columnMetadata -> { |
| |
| PulsarColumnMetadata pulsarColumnMetadata = (PulsarColumnMetadata) columnMetadata; |
| |
| PulsarColumnHandle pulsarColumnHandle = new PulsarColumnHandle( |
| connectorId, |
| pulsarColumnMetadata.getNameWithCase(), |
| pulsarColumnMetadata.getType(), |
| pulsarColumnMetadata.isHidden(), |
| pulsarColumnMetadata.isInternal(), |
| pulsarColumnMetadata.getDecoderExtraInfo().getMapping(), |
| pulsarColumnMetadata.getDecoderExtraInfo().getDataFormat(), |
| pulsarColumnMetadata.getDecoderExtraInfo().getFormatHint(), |
| pulsarColumnMetadata.getHandleKeyValueType()); |
| |
| columnHandles.put( |
| columnMetadata.getName(), |
| pulsarColumnHandle); |
| }); |
| |
| PulsarInternalColumn.getInternalFields().forEach(pulsarInternalColumn -> { |
| PulsarColumnHandle pulsarColumnHandle = pulsarInternalColumn.getColumnHandle(connectorId, false); |
| columnHandles.put(pulsarColumnHandle.getName(), pulsarColumnHandle); |
| }); |
| |
| return columnHandles.build(); |
| } |
| |
| @Override |
| public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle |
| columnHandle) { |
| |
| convertTableHandle(tableHandle); |
| return convertColumnHandle(columnHandle).getColumnMetadata(); |
| } |
| |
| @Override |
| public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix |
| prefix) { |
| |
| requireNonNull(prefix, "prefix is null"); |
| |
| ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder(); |
| |
| List<SchemaTableName> tableNames; |
| if (!prefix.getTable().isPresent()) { |
| tableNames = listTables(session, prefix.getSchema()); |
| } else { |
| tableNames = ImmutableList.of(new SchemaTableName(prefix.getSchema().get(), prefix.getTable().get())); |
| } |
| |
| for (SchemaTableName tableName : tableNames) { |
| ConnectorTableMetadata connectorTableMetadata = getTableMetadata(tableName, true); |
| if (connectorTableMetadata != null) { |
| columns.put(tableName, connectorTableMetadata.getColumns()); |
| } |
| } |
| |
| return columns.build(); |
| } |
| |
| private ConnectorTableMetadata getTableMetadata(SchemaTableName schemaTableName, boolean withInternalColumns) { |
| |
| if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) { |
| return null; |
| } |
| |
| TopicName topicName = getMatchedTopicName(schemaTableName); |
| |
| SchemaInfo schemaInfo; |
| try { |
| schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(topicName.getSchemaName()); |
| } catch (PulsarAdminException e) { |
| if (e.getStatusCode() == 404) { |
| // use default schema because there is no schema |
| schemaInfo = PulsarSqlSchemaInfoProvider.defaultSchema(); |
| |
| } else if (e.getStatusCode() == 401) { |
| throw new PrestoException(QUERY_REJECTED, |
| String.format("Failed to get pulsar topic schema information for topic %s: Unauthorized", |
| topicName)); |
| } else { |
| throw new RuntimeException("Failed to get pulsar topic schema information for topic " |
| + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); |
| } |
| } |
| List<ColumnMetadata> handles = getPulsarColumns( |
| topicName, schemaInfo, withInternalColumns, PulsarColumnHandle.HandleKeyValueType.NONE |
| ); |
| |
| |
| return new ConnectorTableMetadata(schemaTableName, handles); |
| } |
| |
| /** |
| * Convert pulsar schema into presto table metadata. |
| */ |
| @VisibleForTesting |
| public List<ColumnMetadata> getPulsarColumns(TopicName topicName, |
| SchemaInfo schemaInfo, |
| boolean withInternalColumns, |
| PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { |
| SchemaType schemaType = schemaInfo.getType(); |
| if (schemaType.isStruct() || schemaType.isPrimitive()) { |
| return getPulsarColumnsFromSchema(topicName, schemaInfo, withInternalColumns, handleKeyValueType); |
| } else if (schemaType.equals(SchemaType.KEY_VALUE)) { |
| return getPulsarColumnsFromKeyValueSchema(topicName, schemaInfo, withInternalColumns); |
| } else { |
| throw new IllegalArgumentException("Unsupported schema : " + schemaInfo); |
| } |
| } |
| |
| List<ColumnMetadata> getPulsarColumnsFromSchema(TopicName topicName, |
| SchemaInfo schemaInfo, |
| boolean withInternalColumns, |
| PulsarColumnHandle.HandleKeyValueType handleKeyValueType) { |
| ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder(); |
| builder.addAll(decoderFactory.extractColumnMetadata(topicName, schemaInfo, handleKeyValueType)); |
| if (withInternalColumns) { |
| PulsarInternalColumn.getInternalFields() |
| .stream() |
| .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false))); |
| } |
| return builder.build(); |
| } |
| |
| List<ColumnMetadata> getPulsarColumnsFromKeyValueSchema(TopicName topicName, |
| SchemaInfo schemaInfo, |
| boolean withInternalColumns) { |
| ImmutableList.Builder<ColumnMetadata> builder = ImmutableList.builder(); |
| KeyValue<SchemaInfo, SchemaInfo> kvSchemaInfo = KeyValueSchemaInfo.decodeKeyValueSchemaInfo(schemaInfo); |
| SchemaInfo keySchemaInfo = kvSchemaInfo.getKey(); |
| List<ColumnMetadata> keyColumnMetadataList = getPulsarColumns(topicName, keySchemaInfo, false, |
| PulsarColumnHandle.HandleKeyValueType.KEY); |
| builder.addAll(keyColumnMetadataList); |
| |
| SchemaInfo valueSchemaInfo = kvSchemaInfo.getValue(); |
| List<ColumnMetadata> valueColumnMetadataList = getPulsarColumns(topicName, valueSchemaInfo, false, |
| PulsarColumnHandle.HandleKeyValueType.VALUE); |
| builder.addAll(valueColumnMetadataList); |
| |
| if (withInternalColumns) { |
| PulsarInternalColumn.getInternalFields() |
| .forEach(pulsarInternalColumn -> builder.add(pulsarInternalColumn.getColumnMetadata(false))); |
| } |
| return builder.build(); |
| } |
| |
| private TopicName getMatchedTopicName(SchemaTableName schemaTableName) { |
| TopicName topicName; |
| try { |
| topicName = tableNameTopicNameCache.get(schemaTableName); |
| } catch (Exception e) { |
| log.error(e, "Failed to get table handler for tableName " + schemaTableName); |
| if (e.getCause() != null && e.getCause() instanceof RuntimeException) { |
| throw (RuntimeException) e.getCause(); |
| } |
| throw new TableNotFoundException(schemaTableName); |
| } |
| return topicName; |
| } |
| |
| private TopicName getMatchedPulsarTopic(SchemaTableName schemaTableName) { |
| String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig); |
| |
| Set<String> topicsSetWithoutPartition; |
| try { |
| List<String> allTopics = this.pulsarAdmin.topics().getList(namespace); |
| topicsSetWithoutPartition = allTopics.stream() |
| .map(t -> t.split(TopicName.PARTITIONED_TOPIC_SUFFIX)[0]) |
| .collect(Collectors.toSet()); |
| } catch (PulsarAdminException e) { |
| if (e.getStatusCode() == 404) { |
| throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist"); |
| } else if (e.getStatusCode() == 401) { |
| throw new PrestoException(QUERY_REJECTED, |
| String.format("Failed to get topics in schema %s: Unauthorized", namespace)); |
| } |
| throw new RuntimeException("Failed to get topics in schema " + namespace |
| + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); |
| } |
| |
| List<String> matchedTopics = topicsSetWithoutPartition.stream() |
| .filter(t -> TopicName.get(t).getLocalName().equalsIgnoreCase(schemaTableName.getTableName())) |
| .collect(Collectors.toList()); |
| |
| if (matchedTopics.size() == 0) { |
| log.error("Table %s not found", String.format("%s/%s", namespace, schemaTableName.getTableName())); |
| throw new TableNotFoundException(schemaTableName); |
| } else if (matchedTopics.size() != 1) { |
| String errMsg = String.format("There are multiple topics %s matched the table name %s", |
| matchedTopics.toString(), |
| String.format("%s/%s", namespace, schemaTableName.getTableName())); |
| log.error(errMsg); |
| throw new TableNotFoundException(schemaTableName, errMsg); |
| } |
| log.info("matched topic %s for table %s ", matchedTopics.get(0), schemaTableName); |
| return TopicName.get(matchedTopics.get(0)); |
| } |
| |
| } |