| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.sql.avatica; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Joiner; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Iterables; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| import com.google.inject.Inject; |
| import com.google.inject.Injector; |
| import org.apache.calcite.avatica.MetaImpl; |
| import org.apache.calcite.avatica.MissingResultsException; |
| import org.apache.calcite.avatica.NoSuchConnectionException; |
| import org.apache.calcite.avatica.NoSuchStatementException; |
| import org.apache.calcite.avatica.QueryState; |
| import org.apache.calcite.avatica.remote.TypedValue; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.ISE; |
| import org.apache.druid.java.util.common.StringUtils; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.druid.server.security.AuthenticationResult; |
| import org.apache.druid.server.security.Authenticator; |
| import org.apache.druid.server.security.AuthenticatorMapper; |
| import org.apache.druid.server.security.ForbiddenException; |
| import org.apache.druid.sql.SqlLifecycleFactory; |
| import org.apache.druid.sql.calcite.planner.Calcites; |
| import org.apache.druid.sql.calcite.planner.PlannerContext; |
| import org.joda.time.Interval; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| public class DruidMeta extends MetaImpl |
| { |
| public static <T extends Throwable> T logFailure(T error, String message, Object... format) |
| { |
| LOG.error(message, format); |
| return error; |
| } |
| |
| public static <T extends Throwable> T logFailure(T error) |
| { |
| LOG.error(error, error.getMessage()); |
| return error; |
| } |
| |
| private static final Logger LOG = new Logger(DruidMeta.class); |
| |
| private final SqlLifecycleFactory sqlLifecycleFactory; |
| private final ScheduledExecutorService exec; |
| private final AvaticaServerConfig config; |
| private final List<Authenticator> authenticators; |
| |
| /** Used to track logical connections. */ |
| private final ConcurrentMap<String, DruidConnection> connections = new ConcurrentHashMap<>(); |
| |
| /** |
| * Number of connections reserved in "connections". May be higher than the actual number of connections at times, |
| * such as when we're reserving space to open a new one. |
| */ |
| private final AtomicInteger connectionCount = new AtomicInteger(); |
| |
| @Inject |
| public DruidMeta( |
| final SqlLifecycleFactory sqlLifecycleFactory, |
| final AvaticaServerConfig config, |
| final Injector injector |
| ) |
| { |
| super(null); |
| this.sqlLifecycleFactory = Preconditions.checkNotNull(sqlLifecycleFactory, "sqlLifecycleFactory"); |
| this.config = config; |
| this.exec = Executors.newSingleThreadScheduledExecutor( |
| new ThreadFactoryBuilder() |
| .setNameFormat(StringUtils.format("DruidMeta@%s-ScheduledExecutor", Integer.toHexString(hashCode()))) |
| .setDaemon(true) |
| .build() |
| ); |
| |
| final AuthenticatorMapper authenticatorMapper = injector.getInstance(AuthenticatorMapper.class); |
| this.authenticators = authenticatorMapper.getAuthenticatorChain(); |
| } |
| |
| @Override |
| public void openConnection(final ConnectionHandle ch, final Map<String, String> info) |
| { |
| // Build connection context. |
| final ImmutableMap.Builder<String, Object> context = ImmutableMap.builder(); |
| if (info != null) { |
| for (Map.Entry<String, String> entry : info.entrySet()) { |
| context.put(entry); |
| } |
| } |
| // we don't want to stringify arrays for JDBC ever because avatica needs to handle this |
| context.put(PlannerContext.CTX_SQL_STRINGIFY_ARRAYS, false); |
| openDruidConnection(ch.id, context.build()); |
| } |
| |
| @Override |
| public void closeConnection(final ConnectionHandle ch) |
| { |
| final DruidConnection druidConnection = connections.remove(ch.id); |
| if (druidConnection != null) { |
| connectionCount.decrementAndGet(); |
| druidConnection.close(); |
| } |
| } |
| |
| @Override |
| public ConnectionProperties connectionSync(final ConnectionHandle ch, final ConnectionProperties connProps) |
| { |
| // getDruidConnection re-syncs it. |
| getDruidConnection(ch.id); |
| return connProps; |
| } |
| |
| @Override |
| public StatementHandle createStatement(final ConnectionHandle ch) |
| { |
| final DruidStatement druidStatement = getDruidConnection(ch.id).createStatement(sqlLifecycleFactory); |
| return new StatementHandle(ch.id, druidStatement.getStatementId(), null); |
| } |
| |
| @Override |
| public StatementHandle prepare( |
| final ConnectionHandle ch, |
| final String sql, |
| final long maxRowCount |
| ) |
| { |
| final StatementHandle statement = createStatement(ch); |
| final DruidStatement druidStatement; |
| try { |
| druidStatement = getDruidStatement(statement); |
| } |
| catch (NoSuchStatementException e) { |
| throw logFailure(new IllegalStateException(e)); |
| } |
| final DruidConnection druidConnection = getDruidConnection(statement.connectionId); |
| AuthenticationResult authenticationResult = authenticateConnection(druidConnection); |
| if (authenticationResult == null) { |
| throw logFailure( |
| new ForbiddenException("Authentication failed."), |
| "Authentication failed for statement[%s]", |
| druidStatement.getStatementId() |
| ); |
| } |
| statement.signature = druidStatement.prepare(sql, maxRowCount, authenticationResult).getSignature(); |
| LOG.debug("Successfully prepared statement[%s] for execution", druidStatement.getStatementId()); |
| return statement; |
| } |
| |
| @Deprecated |
| @Override |
| public ExecuteResult prepareAndExecute( |
| final StatementHandle h, |
| final String sql, |
| final long maxRowCount, |
| final PrepareCallback callback |
| ) |
| { |
| // Avatica doesn't call this. |
| throw new UnsupportedOperationException("Deprecated"); |
| } |
| |
| @Override |
| public ExecuteResult prepareAndExecute( |
| final StatementHandle statement, |
| final String sql, |
| final long maxRowCount, |
| final int maxRowsInFirstFrame, |
| final PrepareCallback callback |
| ) throws NoSuchStatementException |
| { |
| // Ignore "callback", this class is designed for use with LocalService which doesn't use it. |
| final DruidStatement druidStatement = getDruidStatement(statement); |
| final DruidConnection druidConnection = getDruidConnection(statement.connectionId); |
| AuthenticationResult authenticationResult = authenticateConnection(druidConnection); |
| if (authenticationResult == null) { |
| throw logFailure( |
| new ForbiddenException("Authentication failed."), |
| "Authentication failed for statement[%s]", |
| druidStatement.getStatementId() |
| ); |
| } |
| druidStatement.prepare(sql, maxRowCount, authenticationResult); |
| final Frame firstFrame = druidStatement.execute(Collections.emptyList()) |
| .nextFrame( |
| DruidStatement.START_OFFSET, |
| getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame) |
| ); |
| final Signature signature = druidStatement.getSignature(); |
| LOG.debug("Successfully prepared statement[%s] and started execution", druidStatement.getStatementId()); |
| return new ExecuteResult( |
| ImmutableList.of( |
| MetaResultSet.create( |
| statement.connectionId, |
| statement.id, |
| false, |
| signature, |
| firstFrame |
| ) |
| ) |
| ); |
| } |
| |
| @Override |
| public ExecuteBatchResult prepareAndExecuteBatch( |
| final StatementHandle statement, |
| final List<String> sqlCommands |
| ) |
| { |
| // Batch statements are used for bulk updates, but we don't support updates. |
| throw new UnsupportedOperationException("Batch statements not supported"); |
| } |
| |
| @Override |
| public ExecuteBatchResult executeBatch( |
| final StatementHandle statement, |
| final List<List<TypedValue>> parameterValues |
| ) |
| { |
| // Batch statements are used for bulk updates, but we don't support updates. |
| throw new UnsupportedOperationException("Batch statements not supported"); |
| } |
| |
| @Override |
| public Frame fetch( |
| final StatementHandle statement, |
| final long offset, |
| final int fetchMaxRowCount |
| ) throws NoSuchStatementException, MissingResultsException |
| { |
| final int maxRows = getEffectiveMaxRowsPerFrame(fetchMaxRowCount); |
| LOG.debug("Fetching next frame from offset[%s] with [%s] rows for statement[%s]", offset, maxRows, statement.id); |
| return getDruidStatement(statement).nextFrame(offset, maxRows); |
| } |
| |
| @Deprecated |
| @Override |
| public ExecuteResult execute( |
| final StatementHandle statement, |
| final List<TypedValue> parameterValues, |
| final long maxRowCount |
| ) |
| { |
| // Avatica doesn't call this. |
| throw new UnsupportedOperationException("Deprecated"); |
| } |
| |
| @Override |
| public ExecuteResult execute( |
| final StatementHandle statement, |
| final List<TypedValue> parameterValues, |
| final int maxRowsInFirstFrame |
| ) throws NoSuchStatementException |
| { |
| final DruidStatement druidStatement = getDruidStatement(statement); |
| final Frame firstFrame = druidStatement.execute(parameterValues) |
| .nextFrame( |
| DruidStatement.START_OFFSET, |
| getEffectiveMaxRowsPerFrame(maxRowsInFirstFrame) |
| ); |
| |
| final Signature signature = druidStatement.getSignature(); |
| LOG.debug("Successfully started execution of statement[%s]", druidStatement.getStatementId()); |
| return new ExecuteResult( |
| ImmutableList.of( |
| MetaResultSet.create( |
| statement.connectionId, |
| statement.id, |
| false, |
| signature, |
| firstFrame |
| ) |
| ) |
| ); |
| } |
| |
| @Override |
| public Iterable<Object> createIterable( |
| final StatementHandle statement, |
| final QueryState state, |
| final Signature signature, |
| final List<TypedValue> parameterValues, |
| final Frame firstFrame |
| ) |
| { |
| // Avatica calls this but ignores the return value. |
| return null; |
| } |
| |
| @Override |
| public void closeStatement(final StatementHandle h) |
| { |
| // connections.get, not getDruidConnection, since we want to silently ignore nonexistent statements |
| final DruidConnection druidConnection = connections.get(h.connectionId); |
| if (druidConnection != null) { |
| final DruidStatement druidStatement = druidConnection.getStatement(h.id); |
| if (druidStatement != null) { |
| druidStatement.close(); |
| } |
| } |
| } |
| |
| @Override |
| public boolean syncResults( |
| final StatementHandle sh, |
| final QueryState state, |
| final long offset |
| ) throws NoSuchStatementException |
| { |
| final DruidStatement druidStatement = getDruidStatement(sh); |
| final boolean isDone = druidStatement.isDone(); |
| final long currentOffset = druidStatement.getCurrentOffset(); |
| if (currentOffset != offset) { |
| throw logFailure(new ISE("Requested offset[%,d] does not match currentOffset[%,d]", offset, currentOffset)); |
| } |
| return !isDone; |
| } |
| |
| @Override |
| public void commit(final ConnectionHandle ch) |
| { |
| // We don't support writes, just ignore commits. |
| } |
| |
| @Override |
| public void rollback(final ConnectionHandle ch) |
| { |
| // We don't support writes, just ignore rollbacks. |
| } |
| |
| @Override |
| public Map<DatabaseProperty, Object> getDatabaseProperties(final ConnectionHandle ch) |
| { |
| return ImmutableMap.of(); |
| } |
| |
| @Override |
| public MetaResultSet getCatalogs(final ConnectionHandle ch) |
| { |
| final String sql = "SELECT\n" |
| + " DISTINCT CATALOG_NAME AS TABLE_CAT\n" |
| + "FROM\n" |
| + " INFORMATION_SCHEMA.SCHEMATA\n" |
| + "ORDER BY\n" |
| + " TABLE_CAT\n"; |
| |
| return sqlResultSet(ch, sql); |
| } |
| |
| @Override |
| public MetaResultSet getSchemas( |
| final ConnectionHandle ch, |
| final String catalog, |
| final Pat schemaPattern |
| ) |
| { |
| final List<String> whereBuilder = new ArrayList<>(); |
| if (catalog != null) { |
| whereBuilder.add("SCHEMATA.CATALOG_NAME = " + Calcites.escapeStringLiteral(catalog)); |
| } |
| |
| if (schemaPattern.s != null) { |
| whereBuilder.add("SCHEMATA.SCHEMA_NAME LIKE " + withEscapeClause(schemaPattern.s)); |
| } |
| |
| final String where = whereBuilder.isEmpty() ? "" : "WHERE " + Joiner.on(" AND ").join(whereBuilder); |
| final String sql = "SELECT\n" |
| + " SCHEMA_NAME AS TABLE_SCHEM,\n" |
| + " CATALOG_NAME AS TABLE_CATALOG\n" |
| + "FROM\n" |
| + " INFORMATION_SCHEMA.SCHEMATA\n" |
| + where + "\n" |
| + "ORDER BY\n" |
| + " TABLE_CATALOG, TABLE_SCHEM\n"; |
| |
| return sqlResultSet(ch, sql); |
| } |
| |
| @Override |
| public MetaResultSet getTables( |
| final ConnectionHandle ch, |
| final String catalog, |
| final Pat schemaPattern, |
| final Pat tableNamePattern, |
| final List<String> typeList |
| ) |
| { |
| final List<String> whereBuilder = new ArrayList<>(); |
| if (catalog != null) { |
| whereBuilder.add("TABLES.TABLE_CATALOG = " + Calcites.escapeStringLiteral(catalog)); |
| } |
| |
| if (schemaPattern.s != null) { |
| whereBuilder.add("TABLES.TABLE_SCHEMA LIKE " + withEscapeClause(schemaPattern.s)); |
| } |
| |
| if (tableNamePattern.s != null) { |
| whereBuilder.add("TABLES.TABLE_NAME LIKE " + withEscapeClause(tableNamePattern.s)); |
| } |
| |
| if (typeList != null) { |
| final List<String> escapedTypes = new ArrayList<>(); |
| for (String type : typeList) { |
| escapedTypes.add(Calcites.escapeStringLiteral(type)); |
| } |
| whereBuilder.add("TABLES.TABLE_TYPE IN (" + Joiner.on(", ").join(escapedTypes) + ")"); |
| } |
| |
| final String where = whereBuilder.isEmpty() ? "" : "WHERE " + Joiner.on(" AND ").join(whereBuilder); |
| final String sql = "SELECT\n" |
| + " TABLE_CATALOG AS TABLE_CAT,\n" |
| + " TABLE_SCHEMA AS TABLE_SCHEM,\n" |
| + " TABLE_NAME AS TABLE_NAME,\n" |
| + " TABLE_TYPE AS TABLE_TYPE,\n" |
| + " CAST(NULL AS VARCHAR) AS REMARKS,\n" |
| + " CAST(NULL AS VARCHAR) AS TYPE_CAT,\n" |
| + " CAST(NULL AS VARCHAR) AS TYPE_SCHEM,\n" |
| + " CAST(NULL AS VARCHAR) AS TYPE_NAME,\n" |
| + " CAST(NULL AS VARCHAR) AS SELF_REFERENCING_COL_NAME,\n" |
| + " CAST(NULL AS VARCHAR) AS REF_GENERATION\n" |
| + "FROM\n" |
| + " INFORMATION_SCHEMA.TABLES\n" |
| + where + "\n" |
| + "ORDER BY\n" |
| + " TABLE_TYPE, TABLE_CAT, TABLE_SCHEM, TABLE_NAME\n"; |
| |
| return sqlResultSet(ch, sql); |
| } |
| |
| @Override |
| public MetaResultSet getColumns( |
| final ConnectionHandle ch, |
| final String catalog, |
| final Pat schemaPattern, |
| final Pat tableNamePattern, |
| final Pat columnNamePattern |
| ) |
| { |
| final List<String> whereBuilder = new ArrayList<>(); |
| if (catalog != null) { |
| whereBuilder.add("COLUMNS.TABLE_CATALOG = " + Calcites.escapeStringLiteral(catalog)); |
| } |
| |
| if (schemaPattern.s != null) { |
| whereBuilder.add("COLUMNS.TABLE_SCHEMA LIKE " + withEscapeClause(schemaPattern.s)); |
| } |
| |
| if (tableNamePattern.s != null) { |
| whereBuilder.add("COLUMNS.TABLE_NAME LIKE " + withEscapeClause(tableNamePattern.s)); |
| } |
| |
| if (columnNamePattern.s != null) { |
| whereBuilder.add("COLUMNS.COLUMN_NAME LIKE " |
| + withEscapeClause(columnNamePattern.s)); |
| } |
| |
| final String where = whereBuilder.isEmpty() ? "" : "WHERE " + Joiner.on(" AND ").join(whereBuilder); |
| final String sql = "SELECT\n" |
| + " TABLE_CATALOG AS TABLE_CAT,\n" |
| + " TABLE_SCHEMA AS TABLE_SCHEM,\n" |
| + " TABLE_NAME AS TABLE_NAME,\n" |
| + " COLUMN_NAME AS COLUMN_NAME,\n" |
| + " CAST(JDBC_TYPE AS INTEGER) AS DATA_TYPE,\n" |
| + " DATA_TYPE AS TYPE_NAME,\n" |
| + " -1 AS COLUMN_SIZE,\n" |
| + " -1 AS BUFFER_LENGTH,\n" |
| + " -1 AS DECIMAL_DIGITS,\n" |
| + " -1 AS NUM_PREC_RADIX,\n" |
| + " CASE IS_NULLABLE WHEN 'YES' THEN 1 ELSE 0 END AS NULLABLE,\n" |
| + " CAST(NULL AS VARCHAR) AS REMARKS,\n" |
| + " COLUMN_DEFAULT AS COLUMN_DEF,\n" |
| + " -1 AS SQL_DATA_TYPE,\n" |
| + " -1 AS SQL_DATETIME_SUB,\n" |
| + " -1 AS CHAR_OCTET_LENGTH,\n" |
| + " CAST(ORDINAL_POSITION AS INTEGER) AS ORDINAL_POSITION,\n" |
| + " IS_NULLABLE AS IS_NULLABLE,\n" |
| + " CAST(NULL AS VARCHAR) AS SCOPE_CATALOG,\n" |
| + " CAST(NULL AS VARCHAR) AS SCOPE_SCHEMA,\n" |
| + " CAST(NULL AS VARCHAR) AS SCOPE_TABLE,\n" |
| + " -1 AS SOURCE_DATA_TYPE,\n" |
| + " 'NO' AS IS_AUTOINCREMENT,\n" |
| + " 'NO' AS IS_GENERATEDCOLUMN\n" |
| + "FROM\n" |
| + " INFORMATION_SCHEMA.COLUMNS\n" |
| + where + "\n" |
| + "ORDER BY\n" |
| + " TABLE_CAT, TABLE_SCHEM, TABLE_NAME, ORDINAL_POSITION\n"; |
| |
| return sqlResultSet(ch, sql); |
| } |
| |
| @Override |
| public MetaResultSet getTableTypes(final ConnectionHandle ch) |
| { |
| final String sql = "SELECT\n" |
| + " DISTINCT TABLE_TYPE AS TABLE_TYPE\n" |
| + "FROM\n" |
| + " INFORMATION_SCHEMA.TABLES\n" |
| + "ORDER BY\n" |
| + " TABLE_TYPE\n"; |
| |
| return sqlResultSet(ch, sql); |
| } |
| |
| @VisibleForTesting |
| void closeAllConnections() |
| { |
| for (String connectionId : ImmutableSet.copyOf(connections.keySet())) { |
| closeConnection(new ConnectionHandle(connectionId)); |
| } |
| } |
| |
| @Nullable |
| private AuthenticationResult authenticateConnection(final DruidConnection connection) |
| { |
| Map<String, Object> context = connection.context(); |
| for (Authenticator authenticator : authenticators) { |
| LOG.debug("Attempting authentication with authenticator[%s]", authenticator.getClass()); |
| AuthenticationResult authenticationResult = authenticator.authenticateJDBCContext(context); |
| if (authenticationResult != null) { |
| LOG.debug( |
| "Authenticated identity[%s] for connection[%s]", |
| authenticationResult.getIdentity(), |
| connection.getConnectionId() |
| ); |
| return authenticationResult; |
| } |
| } |
| LOG.debug("No successful authentication"); |
| return null; |
| } |
| |
| private DruidConnection openDruidConnection(final String connectionId, final Map<String, Object> context) |
| { |
| if (connectionCount.incrementAndGet() > config.getMaxConnections()) { |
| // O(connections) but we don't expect this to happen often (it's a last-ditch effort to clear out |
| // abandoned connections) or to have too many connections. |
| final Iterator<Map.Entry<String, DruidConnection>> entryIterator = connections.entrySet().iterator(); |
| while (entryIterator.hasNext()) { |
| final Map.Entry<String, DruidConnection> entry = entryIterator.next(); |
| if (entry.getValue().closeIfEmpty()) { |
| entryIterator.remove(); |
| |
| // Removed a connection, decrement the counter. |
| connectionCount.decrementAndGet(); |
| break; |
| } |
| } |
| |
| if (connectionCount.get() > config.getMaxConnections()) { |
| // We aren't going to make a connection after all. |
| connectionCount.decrementAndGet(); |
| throw logFailure(new ISE("Too many connections, limit is[%,d]", config.getMaxConnections())); |
| } |
| } |
| |
| final DruidConnection putResult = connections.putIfAbsent( |
| connectionId, |
| new DruidConnection(connectionId, config.getMaxStatementsPerConnection(), context) |
| ); |
| |
| if (putResult != null) { |
| // Didn't actually insert the connection. |
| connectionCount.decrementAndGet(); |
| throw logFailure(new ISE("Connection[%s] already open.", connectionId)); |
| } |
| |
| LOG.debug("Connection[%s] opened.", connectionId); |
| |
| // Call getDruidConnection to start the timeout timer. |
| return getDruidConnection(connectionId); |
| } |
| |
| /** |
| * Get a connection, or throw an exception if it doesn't exist. Also refreshes the timeout timer. |
| * |
| * @param connectionId connection id |
| * |
| * @return the connection |
| * |
| * @throws NoSuchConnectionException if the connection id doesn't exist |
| */ |
| @Nonnull |
| private DruidConnection getDruidConnection(final String connectionId) |
| { |
| final DruidConnection connection = connections.get(connectionId); |
| |
| if (connection == null) { |
| throw logFailure(new NoSuchConnectionException(connectionId)); |
| } |
| |
| return connection.sync( |
| exec.schedule( |
| () -> { |
| LOG.debug("Connection[%s] timed out.", connectionId); |
| closeConnection(new ConnectionHandle(connectionId)); |
| }, |
| new Interval(DateTimes.nowUtc(), config.getConnectionIdleTimeout()).toDurationMillis(), |
| TimeUnit.MILLISECONDS |
| ) |
| ); |
| } |
| |
| @Nonnull |
| private DruidStatement getDruidStatement(final StatementHandle statement) throws NoSuchStatementException |
| { |
| final DruidConnection connection = getDruidConnection(statement.connectionId); |
| final DruidStatement druidStatement = connection.getStatement(statement.id); |
| if (druidStatement == null) { |
| throw logFailure(new NoSuchStatementException(statement)); |
| } |
| return druidStatement; |
| } |
| |
| private MetaResultSet sqlResultSet(final ConnectionHandle ch, final String sql) |
| { |
| final StatementHandle statement = createStatement(ch); |
| try { |
| final ExecuteResult result = prepareAndExecute(statement, sql, -1, -1, null); |
| final MetaResultSet metaResultSet = Iterables.getOnlyElement(result.resultSets); |
| if (!metaResultSet.firstFrame.done) { |
| throw logFailure(new ISE("Expected all results to be in a single frame!")); |
| } |
| return metaResultSet; |
| } |
| catch (Exception e) { |
| throw logFailure(new RuntimeException(e)); |
| } |
| finally { |
| closeStatement(statement); |
| } |
| } |
| |
| /** |
| * Determine JDBC 'frame' size, that is the number of results which will be returned to a single |
| * {@link java.sql.ResultSet}. This value corresponds to {@link java.sql.Statement#setFetchSize(int)} (which is a user |
| * hint, we don't have to honor it), and this method modifies it, ensuring the actual chosen value falls within |
| * {@link AvaticaServerConfig#minRowsPerFrame} and {@link AvaticaServerConfig#maxRowsPerFrame}. |
| * |
| * A value of -1 supplied as input indicates that the client has no preference for fetch size, and can handle |
| * unlimited results (at our discretion). Similarly, a value of -1 for {@link AvaticaServerConfig#maxRowsPerFrame} |
| * also indicates that there is no upper limit on fetch size on the server side. |
| * |
| * {@link AvaticaServerConfig#minRowsPerFrame} must be configured to a value greater than 0, because it will be |
| * checked against if any additional frames are required (which means one of the input or maximum was set to a value |
| * other than -1). |
| */ |
| private int getEffectiveMaxRowsPerFrame(int clientMaxRowsPerFrame) |
| { |
| // no configured row limit, use the client provided limit |
| if (config.getMaxRowsPerFrame() < 0) { |
| return adjustForMinumumRowsPerFrame(clientMaxRowsPerFrame); |
| } |
| // client provided no row limit, use the configured row limit |
| if (clientMaxRowsPerFrame < 0) { |
| return adjustForMinumumRowsPerFrame(config.getMaxRowsPerFrame()); |
| } |
| return adjustForMinumumRowsPerFrame(Math.min(clientMaxRowsPerFrame, config.getMaxRowsPerFrame())); |
| } |
| |
| /** |
| * coerce fetch size to be, at minimum, {@link AvaticaServerConfig#minRowsPerFrame} |
| */ |
| private int adjustForMinumumRowsPerFrame(int rowsPerFrame) |
| { |
| final int adjustedRowsPerFrame = Math.max(config.getMinRowsPerFrame(), rowsPerFrame); |
| return adjustedRowsPerFrame; |
| } |
| |
| private static String withEscapeClause(String toEscape) |
| { |
| return Calcites.escapeStringLiteral(toEscape) + " ESCAPE '\\'"; |
| } |
| } |