blob: 027c99b258cadc7e9f2ea80cd27b980e2535886d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.jdbc;
import java.util.Properties;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.StoragePluginConfig.AuthMode;
import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.planner.PlannerPhase;
import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.SchemaConfig;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
import org.apache.drill.exec.util.ImpersonationUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
public class JdbcStoragePlugin extends AbstractStoragePlugin {
static final Logger logger = LoggerFactory.getLogger(JdbcStoragePlugin.class);
private final JdbcStorageConfig jdbcStorageConfig;
private final JdbcDialectFactory dialectFactory;
private final JdbcConventionFactory conventionFactory;
private volatile SqlDialect sqlDialect;
// DataSources for this storage config keyed on JDBC username
private final Map<String, HikariDataSource> dataSources = new ConcurrentHashMap<>();
public JdbcStoragePlugin(JdbcStorageConfig jdbcStorageConfig, DrillbitContext context, String name) {
super(context, name);
this.jdbcStorageConfig = jdbcStorageConfig;
this.dialectFactory = new JdbcDialectFactory();
this.conventionFactory = new JdbcConventionFactory();
}
@Override
public void registerSchemas(SchemaConfig config, SchemaPlus parent) {
UserCredentials userCreds = config.getQueryUserCredentials();
Optional<DataSource> dataSource = getDataSource(userCreds);
if (!dataSource.isPresent()) {
logger.debug(
"No schemas will be registered in {} for query user {}.",
getName(),
config.getUserName()
);
return;
}
SqlDialect dialect = getDialect(userCreds);
getJdbcDialect(dialect).registerSchemas(config, parent);
}
public Optional<DataSource> getDataSource(UserCredentials userCredentials) {
Optional<UsernamePasswordCredentials> jdbcCreds = jdbcStorageConfig.getUsernamePasswordCredentials(userCredentials);
if (!jdbcCreds.isPresent() && jdbcStorageConfig.getAuthMode() == AuthMode.USER_TRANSLATION) {
logger.info(
"There are no {} mode credentials in {} for query user {}, will not attempt to connect.",
AuthMode.USER_TRANSLATION,
getName(),
userCredentials.getUserName()
);
return Optional.empty();
}
// Missing creds is valid under SHARED_USER (e.g. unsecured DBs, BigQuery's OAuth)
// and we fall back to using a key of Drillbit process username in this instance.
String dsKey = jdbcCreds.isPresent()
? jdbcCreds.get().getUsername()
: ImpersonationUtil.getProcessUserName();
return Optional.of(dataSources.computeIfAbsent(
dsKey,
ds -> initDataSource(this.jdbcStorageConfig, jdbcCreds.orElse(null))
));
}
public SqlDialect getDialect(UserCredentials userCredentials) {
SqlDialect sd = sqlDialect;
if (sd == null) {
// Double checked locking using a volatile member and a local var
// optimisation to reduce volatile accesses.
synchronized (this) {
sd = sqlDialect;
if (sd == null) {
sd = JdbcSchema.createDialect(
SqlDialectFactoryImpl.INSTANCE,
getDataSource(userCredentials).get()
);
sqlDialect = sd;
}
}
}
return sd;
}
public JdbcDialect getJdbcDialect(SqlDialect dialect) {
return dialectFactory.getJdbcDialect(this, dialect);
}
public DrillJdbcConvention getConvention(SqlDialect dialect, UserCredentials userCredentials) {
return conventionFactory.getJdbcConvention(this, dialect, userCredentials);
}
@Override
public JdbcStorageConfig getConfig() {
return jdbcStorageConfig;
}
@Override
public boolean supportsRead() {
return true;
}
@Override
public boolean supportsWrite() {
return jdbcStorageConfig.isWritable();
}
@Override
public boolean supportsInsert() {
return jdbcStorageConfig.isWritable();
}
@Override
public Set<? extends RelOptRule> getOptimizerRules(
OptimizerRulesContext optimizerContext, PlannerPhase phase) {
switch (phase) {
case LOGICAL:
case PHYSICAL: {
UserCredentials userCreds = optimizerContext.getContextInformation().getQueryUserCredentials();
return getDataSource(userCreds)
.map(dataSource -> getConvention(getDialect(userCreds), userCreds).getRules())
.orElse(ImmutableSet.of());
}
case LOGICAL_PRUNE_AND_JOIN:
case LOGICAL_PRUNE:
case PARTITION_PRUNING:
case JOIN_PLANNING:
default:
return ImmutableSet.of();
}
}
@Override
public void close() throws Exception {
AutoCloseables.close(dataSources.values());
}
/**
* Initializes {@link HikariDataSource} instance and configures it based on given
* storage plugin configuration.
* Basic parameters such as driver, url, user name and password are set using setters.
* Other source parameters are set dynamically through the properties. See the list
* of available Hikari properties: <a href="https://github.com/brettwooldridge/HikariCP">.
*
* @param config storage plugin config
* @return Hikari data source instance
* @throws UserException if unable to configure Hikari data source
*/
@VisibleForTesting
static HikariDataSource initDataSource(
JdbcStorageConfig config,
UsernamePasswordCredentials jdbcCredentials
) {
try {
Properties properties = new Properties();
/*
Set default HikariCP values which prefer to connect lazily to avoid overwhelming source
systems with connections which mostly remain idle. A data source that is present in N
storage configs replicated over P drillbits with a HikariCP minimumIdle value of Q will
have N×P×Q connections made to it eagerly.
The trade off of lazier connections is increased latency after periods of inactivity in
which the pool has emptied. When comparing the defaults that follow with e.g. the
HikariCP defaults, bear in mind that the context here is OLAP, not OLTP. It is normal
for queries to run for a long time and to be separated by long intermissions. Users who
prefer eager to lazy connections remain free to overwrite the following defaults in their
storage config.
*/
// maximum amount of time that a connection is allowed to sit idle in the pool, 0 ⇒ forever
properties.setProperty("idleTimeout", String.valueOf(TimeUnit.HOURS.toMillis(2)));
// how frequently HikariCP will attempt to keep a connection alive, 0 ⇒ disabled
properties.setProperty("keepaliveTime", String.valueOf(TimeUnit.MINUTES.toMillis(5)));
// maximum lifetime of a connection in the pool, 0 ⇒ forever
properties.setProperty("maxLifetime", String.valueOf(TimeUnit.HOURS.toMillis(12)));
// minimum number of idle connections that HikariCP tries to maintain in the pool, 0 ⇒ none
properties.setProperty("minimumIdle", "0");
// maximum size that the pool is allowed to reach, including both idle and in-use connections
properties.setProperty("maximumPoolSize", "10");
// apply any HikariCP parameters the user may have set, overwriting defaults
properties.putAll(config.getSourceParameters());
HikariConfig hikariConfig = new HikariConfig(properties);
hikariConfig.setDriverClassName(config.getDriver());
hikariConfig.setJdbcUrl(config.getUrl());
if (jdbcCredentials != null) {
hikariConfig.setUsername(jdbcCredentials.getUsername());
hikariConfig.setPassword(jdbcCredentials.getPassword());
}
/*
The following serves as a hint to the driver, which *might* enable database
optimizations. Unfortunately some JDBC drivers without read-only support,
notably Snowflake's, fail to connect outright when this option is set even
though it is only a hint, so enabling it is generally problematic.
The solution is to leave that option as null.
*/
if (config.isWritable() != null) {
hikariConfig.setReadOnly(!config.isWritable());
}
return new HikariDataSource(hikariConfig);
} catch (RuntimeException e) {
throw UserException.connectionError(e)
.message("Unable to configure data source: %s", e.getMessage())
.build(logger);
}
}
}