| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.calcite.jdbc; |
| |
| import org.apache.calcite.DataContext; |
| import org.apache.calcite.adapter.java.JavaTypeFactory; |
| import org.apache.calcite.avatica.AvaticaConnection; |
| import org.apache.calcite.avatica.AvaticaFactory; |
| import org.apache.calcite.avatica.AvaticaSite; |
| import org.apache.calcite.avatica.AvaticaStatement; |
| import org.apache.calcite.avatica.Helper; |
| import org.apache.calcite.avatica.InternalProperty; |
| import org.apache.calcite.avatica.Meta; |
| import org.apache.calcite.avatica.MetaImpl; |
| import org.apache.calcite.avatica.NoSuchStatementException; |
| import org.apache.calcite.avatica.UnregisteredDriver; |
| import org.apache.calcite.avatica.remote.TypedValue; |
| import org.apache.calcite.config.CalciteConnectionConfig; |
| import org.apache.calcite.config.CalciteConnectionConfigImpl; |
| import org.apache.calcite.linq4j.BaseQueryable; |
| import org.apache.calcite.linq4j.Enumerable; |
| import org.apache.calcite.linq4j.Enumerator; |
| import org.apache.calcite.linq4j.Ord; |
| import org.apache.calcite.linq4j.QueryProvider; |
| import org.apache.calcite.linq4j.Queryable; |
| import org.apache.calcite.linq4j.function.Function0; |
| import org.apache.calcite.linq4j.tree.Expression; |
| import org.apache.calcite.linq4j.tree.Expressions; |
| import org.apache.calcite.materialize.Lattice; |
| import org.apache.calcite.materialize.MaterializationService; |
| import org.apache.calcite.prepare.CalciteCatalogReader; |
| import org.apache.calcite.rel.RelNode; |
| import org.apache.calcite.rel.type.RelDataTypeSystem; |
| import org.apache.calcite.runtime.Hook; |
| import org.apache.calcite.schema.SchemaPlus; |
| import org.apache.calcite.schema.Schemas; |
| import org.apache.calcite.schema.impl.AbstractSchema; |
| import org.apache.calcite.server.CalciteServer; |
| import org.apache.calcite.server.CalciteServerStatement; |
| import org.apache.calcite.sql.advise.SqlAdvisor; |
| import org.apache.calcite.sql.advise.SqlAdvisorValidator; |
| import org.apache.calcite.sql.fun.SqlStdOperatorTable; |
| import org.apache.calcite.sql.validate.SqlConformance; |
| import org.apache.calcite.sql.validate.SqlValidatorWithHints; |
| import org.apache.calcite.tools.RelRunner; |
| import org.apache.calcite.util.BuiltInMethod; |
| import org.apache.calcite.util.Holder; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.base.Throwables; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Maps; |
| |
| import java.io.Serializable; |
| import java.lang.reflect.Type; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.TimeZone; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| /** |
| * Implementation of JDBC connection |
| * in the Calcite engine. |
| * |
| * <p>Abstract to allow newer versions of JDBC to add methods.</p> |
| */ |
| abstract class CalciteConnectionImpl |
| extends AvaticaConnection |
| implements CalciteConnection, QueryProvider { |
| public final JavaTypeFactory typeFactory; |
| |
| final CalciteSchema rootSchema; |
| final Function0<CalcitePrepare> prepareFactory; |
| final CalciteServer server = new CalciteServerImpl(); |
| |
| // must be package-protected |
| static final Trojan TROJAN = createTrojan(); |
| |
| /** |
| * Creates a CalciteConnectionImpl. |
| * |
| * <p>Not public; method is called only from the driver.</p> |
| * |
| * @param driver Driver |
| * @param factory Factory for JDBC objects |
| * @param url Server URL |
| * @param info Other connection properties |
| * @param rootSchema Root schema, or null |
| * @param typeFactory Type factory, or null |
| */ |
| protected CalciteConnectionImpl(Driver driver, AvaticaFactory factory, |
| String url, Properties info, CalciteSchema rootSchema, |
| JavaTypeFactory typeFactory) { |
| super(driver, factory, url, info); |
| CalciteConnectionConfig cfg = new CalciteConnectionConfigImpl(info); |
| this.prepareFactory = driver.prepareFactory; |
| if (typeFactory != null) { |
| this.typeFactory = typeFactory; |
| } else { |
| final RelDataTypeSystem typeSystem = |
| cfg.typeSystem(RelDataTypeSystem.class, RelDataTypeSystem.DEFAULT); |
| this.typeFactory = new JavaTypeFactoryImpl(typeSystem); |
| } |
| this.rootSchema = |
| Preconditions.checkNotNull(rootSchema != null |
| ? rootSchema |
| : CalciteSchema.createRootSchema(true)); |
| Preconditions.checkArgument(this.rootSchema.isRoot(), "must be root schema"); |
| this.properties.put(InternalProperty.CASE_SENSITIVE, cfg.caseSensitive()); |
| this.properties.put(InternalProperty.UNQUOTED_CASING, cfg.unquotedCasing()); |
| this.properties.put(InternalProperty.QUOTED_CASING, cfg.quotedCasing()); |
| this.properties.put(InternalProperty.QUOTING, cfg.quoting()); |
| } |
| |
| CalciteMetaImpl meta() { |
| return (CalciteMetaImpl) meta; |
| } |
| |
| public CalciteConnectionConfig config() { |
| return new CalciteConnectionConfigImpl(info); |
| } |
| |
| /** Called after the constructor has completed and the model has been |
| * loaded. */ |
| void init() { |
| final MaterializationService service = MaterializationService.instance(); |
| for (CalciteSchema.LatticeEntry e : Schemas.getLatticeEntries(rootSchema)) { |
| final Lattice lattice = e.getLattice(); |
| for (Lattice.Tile tile : lattice.computeTiles()) { |
| service.defineTile(lattice, tile.bitSet(), tile.measures, e.schema, |
| true, true); |
| } |
| } |
| } |
| |
| @Override public <T> T unwrap(Class<T> iface) throws SQLException { |
| if (iface == RelRunner.class) { |
| return iface.cast( |
| new RelRunner() { |
| public PreparedStatement prepare(RelNode rel) { |
| try { |
| return prepareStatement_(CalcitePrepare.Query.of(rel), |
| ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, |
| getHoldability()); |
| } catch (SQLException e) { |
| throw Throwables.propagate(e); |
| } |
| } |
| }); |
| } |
| return super.unwrap(iface); |
| } |
| |
| @Override public CalciteStatement createStatement(int resultSetType, |
| int resultSetConcurrency, int resultSetHoldability) throws SQLException { |
| return (CalciteStatement) super.createStatement(resultSetType, |
| resultSetConcurrency, resultSetHoldability); |
| } |
| |
| @Override public CalcitePreparedStatement prepareStatement( |
| String sql, |
| int resultSetType, |
| int resultSetConcurrency, |
| int resultSetHoldability) throws SQLException { |
| final CalcitePrepare.Query<Object> query = CalcitePrepare.Query.of(sql); |
| return prepareStatement_(query, resultSetType, resultSetConcurrency, |
| resultSetHoldability); |
| } |
| |
| private CalcitePreparedStatement prepareStatement_( |
| CalcitePrepare.Query<?> query, |
| int resultSetType, |
| int resultSetConcurrency, |
| int resultSetHoldability) throws SQLException { |
| try { |
| final Meta.Signature signature = |
| parseQuery(query, new ContextImpl(this), -1); |
| final CalcitePreparedStatement calcitePreparedStatement = |
| (CalcitePreparedStatement) factory.newPreparedStatement(this, null, |
| signature, resultSetType, resultSetConcurrency, resultSetHoldability); |
| server.getStatement(calcitePreparedStatement.handle).setSignature(signature); |
| return calcitePreparedStatement; |
| } catch (Exception e) { |
| throw Helper.INSTANCE.createException( |
| "Error while preparing statement [" + query.sql + "]", e); |
| } |
| } |
| |
| <T> CalcitePrepare.CalciteSignature<T> |
| parseQuery(CalcitePrepare.Query<T> query, |
| CalcitePrepare.Context prepareContext, long maxRowCount) { |
| CalcitePrepare.Dummy.push(prepareContext); |
| try { |
| final CalcitePrepare prepare = prepareFactory.apply(); |
| return prepare.prepareSql(prepareContext, query, Object[].class, |
| maxRowCount); |
| } finally { |
| CalcitePrepare.Dummy.pop(prepareContext); |
| } |
| } |
| |
| // CalciteConnection methods |
| |
| public SchemaPlus getRootSchema() { |
| return rootSchema.plus(); |
| } |
| |
| public JavaTypeFactory getTypeFactory() { |
| return typeFactory; |
| } |
| |
| public Properties getProperties() { |
| return info; |
| } |
| |
| // QueryProvider methods |
| |
| public <T> Queryable<T> createQuery( |
| Expression expression, Class<T> rowType) { |
| return new CalciteQueryable<>(this, rowType, expression); |
| } |
| |
| public <T> Queryable<T> createQuery(Expression expression, Type rowType) { |
| return new CalciteQueryable<>(this, rowType, expression); |
| } |
| |
| public <T> T execute(Expression expression, Type type) { |
| return null; // TODO: |
| } |
| |
| public <T> T execute(Expression expression, Class<T> type) { |
| return null; // TODO: |
| } |
| |
| public <T> Enumerator<T> executeQuery(Queryable<T> queryable) { |
| try { |
| CalciteStatement statement = (CalciteStatement) createStatement(); |
| CalcitePrepare.CalciteSignature<T> signature = |
| statement.prepare(queryable); |
| return enumerable(statement.handle, signature).enumerator(); |
| } catch (SQLException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public <T> Enumerable<T> enumerable(Meta.StatementHandle handle, |
| CalcitePrepare.CalciteSignature<T> signature) throws SQLException { |
| Map<String, Object> map = Maps.newLinkedHashMap(); |
| AvaticaStatement statement = lookupStatement(handle); |
| final List<TypedValue> parameterValues = |
| TROJAN.getParameterValues(statement); |
| |
| if (MetaImpl.checkParameterValueHasNull(parameterValues)) { |
| throw new SQLException("exception while executing query: unbound parameter"); |
| } |
| |
| for (Ord<TypedValue> o : Ord.zip(parameterValues)) { |
| map.put("?" + o.i, o.e.toLocal()); |
| } |
| map.putAll(signature.internalParameters); |
| final AtomicBoolean cancelFlag; |
| try { |
| cancelFlag = getCancelFlag(handle); |
| } catch (NoSuchStatementException e) { |
| throw Throwables.propagate(e); |
| } |
| map.put(DataContext.Variable.CANCEL_FLAG.camelName, cancelFlag); |
| final DataContext dataContext = createDataContext(map); |
| return signature.enumerable(dataContext); |
| } |
| |
| /** Returns the flag that is used to request or check cancel for a particular |
| * statement. */ |
| AtomicBoolean getCancelFlag(Meta.StatementHandle handle) |
| throws NoSuchStatementException { |
| final CalciteServerStatement serverStatement = server.getStatement(handle); |
| return ((CalciteServerStatementImpl) serverStatement).cancelFlag; |
| } |
| |
| public DataContext createDataContext(Map<String, Object> parameterValues) { |
| if (config().spark()) { |
| return new SlimDataContext(); |
| } |
| return new DataContextImpl(this, parameterValues); |
| } |
| |
| // do not make public |
| UnregisteredDriver getDriver() { |
| return driver; |
| } |
| |
| // do not make public |
| AvaticaFactory getFactory() { |
| return factory; |
| } |
| |
| /** Implementation of Queryable. */ |
| static class CalciteQueryable<T> extends BaseQueryable<T> { |
| CalciteQueryable(CalciteConnection connection, Type elementType, |
| Expression expression) { |
| super(connection, elementType, expression); |
| } |
| |
| public CalciteConnection getConnection() { |
| return (CalciteConnection) provider; |
| } |
| } |
| |
| /** Implementation of Server. */ |
| private static class CalciteServerImpl implements CalciteServer { |
| final Map<Integer, CalciteServerStatement> statementMap = Maps.newHashMap(); |
| |
| public void removeStatement(Meta.StatementHandle h) { |
| statementMap.remove(h.id); |
| } |
| |
| public void addStatement(CalciteConnection connection, |
| Meta.StatementHandle h) { |
| final CalciteConnectionImpl c = (CalciteConnectionImpl) connection; |
| final CalciteServerStatement previous = |
| statementMap.put(h.id, new CalciteServerStatementImpl(c)); |
| if (previous != null) { |
| throw new AssertionError(); |
| } |
| } |
| |
| public CalciteServerStatement getStatement(Meta.StatementHandle h) |
| throws NoSuchStatementException { |
| CalciteServerStatement statement = statementMap.get(h.id); |
| if (statement == null) { |
| throw new NoSuchStatementException(h); |
| } |
| return statement; |
| } |
| } |
| |
| /** Schema that has no parents. */ |
| static class RootSchema extends AbstractSchema { |
| RootSchema() { |
| super(); |
| } |
| |
| @Override public Expression getExpression(SchemaPlus parentSchema, |
| String name) { |
| return Expressions.call( |
| DataContext.ROOT, |
| BuiltInMethod.DATA_CONTEXT_GET_ROOT_SCHEMA.method); |
| } |
| } |
| |
| /** Implementation of DataContext. */ |
| static class DataContextImpl implements DataContext { |
| private final ImmutableMap<Object, Object> map; |
| private final CalciteSchema rootSchema; |
| private final QueryProvider queryProvider; |
| private final JavaTypeFactory typeFactory; |
| |
| DataContextImpl(CalciteConnectionImpl connection, |
| Map<String, Object> parameters) { |
| this.queryProvider = connection; |
| this.typeFactory = connection.getTypeFactory(); |
| this.rootSchema = connection.rootSchema; |
| |
| // Store the time at which the query started executing. The SQL |
| // standard says that functions such as CURRENT_TIMESTAMP return the |
| // same value throughout the query. |
| final Holder<Long> timeHolder = Holder.of(System.currentTimeMillis()); |
| |
| // Give a hook chance to alter the clock. |
| Hook.CURRENT_TIME.run(timeHolder); |
| final long time = timeHolder.get(); |
| final TimeZone timeZone = connection.getTimeZone(); |
| final long localOffset = timeZone.getOffset(time); |
| final long currentOffset = localOffset; |
| |
| ImmutableMap.Builder<Object, Object> builder = ImmutableMap.builder(); |
| builder.put(Variable.UTC_TIMESTAMP.camelName, time) |
| .put(Variable.CURRENT_TIMESTAMP.camelName, time + currentOffset) |
| .put(Variable.LOCAL_TIMESTAMP.camelName, time + localOffset) |
| .put(Variable.TIME_ZONE.camelName, timeZone); |
| for (Map.Entry<String, Object> entry : parameters.entrySet()) { |
| Object e = entry.getValue(); |
| if (e == null) { |
| e = AvaticaSite.DUMMY_VALUE; |
| } |
| builder.put(entry.getKey(), e); |
| } |
| map = builder.build(); |
| } |
| |
| public synchronized Object get(String name) { |
| Object o = map.get(name); |
| if (o == AvaticaSite.DUMMY_VALUE) { |
| return null; |
| } |
| if (o == null && Variable.SQL_ADVISOR.camelName.equals(name)) { |
| return getSqlAdvisor(); |
| } |
| return o; |
| } |
| |
| private SqlAdvisor getSqlAdvisor() { |
| final CalciteConnectionImpl con = (CalciteConnectionImpl) queryProvider; |
| final String schemaName = con.getSchema(); |
| final List<String> schemaPath = |
| schemaName == null |
| ? ImmutableList.<String>of() |
| : ImmutableList.of(schemaName); |
| final SqlValidatorWithHints validator = |
| new SqlAdvisorValidator(SqlStdOperatorTable.instance(), |
| new CalciteCatalogReader(rootSchema, con.config().caseSensitive(), |
| schemaPath, typeFactory), |
| typeFactory, SqlConformance.DEFAULT); |
| return new SqlAdvisor(validator); |
| } |
| |
| public SchemaPlus getRootSchema() { |
| return rootSchema.plus(); |
| } |
| |
| public JavaTypeFactory getTypeFactory() { |
| return typeFactory; |
| } |
| |
| public QueryProvider getQueryProvider() { |
| return queryProvider; |
| } |
| } |
| |
| /** Implementation of Context. */ |
| static class ContextImpl implements CalcitePrepare.Context { |
| private final CalciteConnectionImpl connection; |
| |
| ContextImpl(CalciteConnectionImpl connection) { |
| this.connection = Preconditions.checkNotNull(connection); |
| } |
| |
| public JavaTypeFactory getTypeFactory() { |
| return connection.typeFactory; |
| } |
| |
| public CalciteSchema getRootSchema() { |
| return connection.rootSchema; |
| } |
| |
| public List<String> getDefaultSchemaPath() { |
| final String schemaName = connection.getSchema(); |
| return schemaName == null |
| ? ImmutableList.<String>of() |
| : ImmutableList.of(schemaName); |
| } |
| |
| public CalciteConnectionConfig config() { |
| return connection.config(); |
| } |
| |
| public DataContext getDataContext() { |
| return connection.createDataContext(ImmutableMap.<String, Object>of()); |
| } |
| |
| public CalcitePrepare.SparkHandler spark() { |
| final boolean enable = config().spark(); |
| return CalcitePrepare.Dummy.getSparkHandler(enable); |
| } |
| } |
| |
| /** Implementation of {@link DataContext} that has few variables and is |
| * {@link Serializable}. For Spark. */ |
| private static class SlimDataContext implements DataContext, Serializable { |
| public SchemaPlus getRootSchema() { |
| return null; |
| } |
| |
| public JavaTypeFactory getTypeFactory() { |
| return null; |
| } |
| |
| public QueryProvider getQueryProvider() { |
| return null; |
| } |
| |
| public Object get(String name) { |
| return null; |
| } |
| } |
| |
| /** Implementation of {@link CalciteServerStatement}. */ |
| static class CalciteServerStatementImpl |
| implements CalciteServerStatement { |
| private final CalciteConnectionImpl connection; |
| private Iterator<Object> iterator; |
| private Meta.Signature signature; |
| private final AtomicBoolean cancelFlag = new AtomicBoolean(); |
| |
| CalciteServerStatementImpl(CalciteConnectionImpl connection) { |
| this.connection = Preconditions.checkNotNull(connection); |
| } |
| |
| public ContextImpl createPrepareContext() { |
| return new ContextImpl(connection); |
| } |
| |
| public CalciteConnection getConnection() { |
| return connection; |
| } |
| |
| public void setSignature(Meta.Signature signature) { |
| this.signature = signature; |
| } |
| |
| public Meta.Signature getSignature() { |
| return signature; |
| } |
| |
| public Iterator<Object> getResultSet() { |
| return iterator; |
| } |
| |
| public void setResultSet(Iterator<Object> iterator) { |
| this.iterator = iterator; |
| } |
| } |
| |
| } |
| |
| // End CalciteConnectionImpl.java |