| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.sql.impl; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.ServiceLoader; |
| import java.util.Set; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; |
| import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; |
| import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; |
| import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteConnection; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.linq4j.tree.Expression; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelProtoDataType; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Function; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Schema; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaFactory; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaVersion; |
| import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.Table; |
| |
| /** |
| * Factory classes that Calcite uses to create initial schema for JDBC connection. |
| * |
| * <p>The name of the factory class is passed in the connection properties into the JDBC driver |
| * {@code connect()} method. Calcite then creates the factory using the default constructor. It |
| * doesn't allow to hook into the process to pass in additional parameters (e.g. connection), so we |
| * have to add the indirection layer. |
| * |
| * <p>{@link AllProviders} is the default factory that is used by {@link JdbcDriver} unless an |
| * override is specified. It greedily finds and loads all available table providers. This is used in |
| * a normal JDBC path, e.g. when CLI connects to {@link JdbcDriver} (without any extra connection |
| * properties). |
| * |
| * <p>{@link Empty} is an override used in {@link JdbcDriver#connect(TableProvider, |
| * org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all available table providers. |
| */ |
| class BeamCalciteSchemaFactory { |
| |
| /** |
| * Called by {@link JdbcConnection} when initializing to convert the initial empty schema to |
| * actual {@link BeamCalciteSchema}. |
| */ |
| static TableProvider fromInitialEmptySchema(JdbcConnection jdbcConnection) { |
| InitialEmptySchema initialEmptySchema = jdbcConnection.getCurrentBeamSchema(); |
| return initialEmptySchema.getTableProvider(); |
| } |
| |
| /** |
| * Loads all table providers using service loader. This is the default configured in {@link |
| * JdbcDriver#connect(String, Properties)}. |
| */ |
| public static class AllProviders extends InitialEmptySchema implements SchemaFactory { |
| |
| /** |
| * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created |
| * by Calcite to a configured table provider. At this point we have a connection open and can |
| * use it to configure Beam schemas, e.g. with pipeline options. |
| * |
| * <p><i>Note:</i> this loads ALL available table providers marked with |
| * {@code @AutoService(TableProvider.class)} |
| */ |
| @Override |
| public TableProvider getTableProvider() { |
| MetaStore metaStore = new InMemoryMetaStore(); |
| for (TableProvider provider : |
| ServiceLoader.load(TableProvider.class, getClass().getClassLoader())) { |
| metaStore.registerProvider(provider); |
| } |
| return metaStore; |
| } |
| |
| /** This is what Calcite calls to create an instance of the default top level schema. */ |
| @Override |
| public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { |
| return this; |
| } |
| } |
| |
| /** |
| * This is the override to create an empty schema, used in {@link JdbcDriver#connect(TableProvider |
| * , org.apache.beam.sdk.options.PipelineOptions)} to avoid loading all table providers. This |
| * schema is expected to be replaced by an actual functional schema by the same code that |
| * specified this override in the first place. |
| */ |
| public static class Empty extends InitialEmptySchema implements SchemaFactory { |
| |
| private static final TableProvider READ_ONLY_TABLE_PROVIDER = |
| new ReadOnlyTableProvider("empty", ImmutableMap.of()); |
| |
| /** |
| * We call this in {@link #fromInitialEmptySchema(JdbcConnection)} to convert the schema created |
| * by Calcite to a configured table provider. This specific instance is an empty readonly |
| * provider that is supposed to be replaced by the code that specified this empty schema in the |
| * connection properties to the driver. |
| */ |
| @Override |
| public TableProvider getTableProvider() { |
| return READ_ONLY_TABLE_PROVIDER; |
| } |
| |
| /** This is what Calcite calls to create an instance of the top level schema. */ |
| @Override |
| public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> operand) { |
| return this; |
| } |
| } |
| |
| /** |
| * Empty schema that {@link CalciteConnection} is initialized with by Calcite using the factories |
| * above. It is not flexible enough to allow us to configure the schema during the initialization. |
| * |
| * <p>This solution with overrides and conversions allows us to handle the initialization |
| * ourselves later after Calcite has created the connection with this empty schema. |
| * |
| * <p>This is a temporary indirection for initial connection initialization, allows us to later |
| * replace this empty schema in the connection with the actual correctly configured schema. |
| */ |
| public abstract static class InitialEmptySchema implements Schema { |
| |
| public abstract TableProvider getTableProvider(); |
| |
| @Override |
| public Table getTable(String name) { |
| return illegal(); |
| } |
| |
| @Override |
| public Set<String> getTableNames() { |
| return illegal(); |
| } |
| |
| @Override |
| public RelProtoDataType getType(String name) { |
| return illegal(); |
| } |
| |
| @Override |
| public Set<String> getTypeNames() { |
| return illegal(); |
| } |
| |
| @Override |
| public Collection<Function> getFunctions(String name) { |
| return illegal(); |
| } |
| |
| @Override |
| public Set<String> getFunctionNames() { |
| return illegal(); |
| } |
| |
| @Override |
| public Schema getSubSchema(String name) { |
| return illegal(); |
| } |
| |
| @Override |
| public Set<String> getSubSchemaNames() { |
| return Collections.emptySet(); |
| } |
| |
| @Override |
| public Expression getExpression(SchemaPlus parentSchema, String name) { |
| return illegal(); |
| } |
| |
| @Override |
| public boolean isMutable() { |
| return illegal(); |
| } |
| |
| @Override |
| public Schema snapshot(SchemaVersion version) { |
| return illegal(); |
| } |
| |
| @SuppressWarnings("TypeParameterUnusedInFormals") |
| private static <T> T illegal() { |
| throw new IllegalStateException("Beam JDBC connection has not been initialized"); |
| } |
| } |
| } |