| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog; |
| |
| import static java.util.stream.Collectors.toMap; |
| |
| import com.google.cloud.datacatalog.DataCatalogGrpc; |
| import com.google.cloud.datacatalog.DataCatalogGrpc.DataCatalogBlockingStub; |
| import com.google.cloud.datacatalog.LookupEntryRequest; |
| import io.grpc.ManagedChannelBuilder; |
| import io.grpc.Status; |
| import io.grpc.StatusRuntimeException; |
| import io.grpc.auth.MoreCallCredentials; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.stream.Stream; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; |
| import org.apache.beam.sdk.extensions.sql.impl.TableName; |
| import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; |
| import org.apache.beam.sdk.extensions.sql.meta.Table; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.FullNameTableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; |
| import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap; |
| |
| /** Uses DataCatalog to get the source type and schema for a table. */ |
| public class DataCatalogTableProvider extends FullNameTableProvider { |
| |
| private Map<String, TableProvider> delegateProviders; |
| private Map<String, Table> tableCache; |
| private DataCatalogBlockingStub dataCatalog; |
| |
| private DataCatalogTableProvider( |
| Map<String, TableProvider> delegateProviders, DataCatalogBlockingStub dataCatalog) { |
| |
| this.tableCache = new HashMap<>(); |
| this.delegateProviders = ImmutableMap.copyOf(delegateProviders); |
| this.dataCatalog = dataCatalog; |
| } |
| |
| public static DataCatalogTableProvider create(DataCatalogPipelineOptions options) { |
| return new DataCatalogTableProvider(getSupportedProviders(), createDataCatalogClient(options)); |
| } |
| |
| private static DataCatalogBlockingStub createDataCatalogClient( |
| DataCatalogPipelineOptions options) { |
| return DataCatalogGrpc.newBlockingStub( |
| ManagedChannelBuilder.forTarget(options.getDataCatalogEndpoint()).build()) |
| .withCallCredentials( |
| MoreCallCredentials.from(options.as(GcpOptions.class).getGcpCredential())); |
| } |
| |
| private static Map<String, TableProvider> getSupportedProviders() { |
| return Stream.of( |
| new PubsubJsonTableProvider(), new BigQueryTableProvider(), new TextTableProvider()) |
| .collect(toMap(TableProvider::getTableType, p -> p)); |
| } |
| |
| @Override |
| public String getTableType() { |
| return "google.cloud.datacatalog"; |
| } |
| |
| @Override |
| public void createTable(Table table) { |
| throw new UnsupportedOperationException( |
| "Creating tables is not supported with DataCatalog table provider."); |
| } |
| |
| @Override |
| public void dropTable(String tableName) { |
| throw new UnsupportedOperationException( |
| "Dropping tables is not supported with DataCatalog table provider"); |
| } |
| |
| @Override |
| public Map<String, Table> getTables() { |
| throw new UnsupportedOperationException("Loading all tables from DataCatalog is not supported"); |
| } |
| |
| @Override |
| public @Nullable Table getTable(String tableNamePart) { |
| throw new UnsupportedOperationException( |
| "Loading a table by partial name '" + tableNamePart + "' is unsupported"); |
| } |
| |
| @Override |
| public @Nullable Table getTableByFullName(TableName fullTableName) { |
| |
| ImmutableList<String> allNameParts = |
| ImmutableList.<String>builder() |
| .addAll(fullTableName.getPath()) |
| .add(fullTableName.getTableName()) |
| .build(); |
| |
| String fullEscapedTableName = ZetaSqlIdUtils.escapeAndJoin(allNameParts); |
| |
| return loadTable(fullEscapedTableName); |
| } |
| |
| private @Nullable Table loadTable(String tableName) { |
| if (!tableCache.containsKey(tableName)) { |
| tableCache.put(tableName, loadTableFromDC(tableName)); |
| } |
| |
| return tableCache.get(tableName); |
| } |
| |
| private Table loadTableFromDC(String tableName) { |
| try { |
| return TableUtils.toBeamTable( |
| tableName, |
| dataCatalog.lookupEntry( |
| LookupEntryRequest.newBuilder().setSqlResource(tableName).build())); |
| } catch (StatusRuntimeException e) { |
| if (e.getStatus().equals(Status.INVALID_ARGUMENT)) { |
| return null; |
| } |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| public BeamSqlTable buildBeamSqlTable(Table table) { |
| return delegateProviders.get(table.getType()).buildBeamSqlTable(table); |
| } |
| } |