| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.zeppelin.jdbc; |
| |
| import static org.apache.commons.lang3.StringUtils.containsIgnoreCase; |
| import static org.apache.commons.lang3.StringUtils.isEmpty; |
| import static org.apache.commons.lang3.StringUtils.isNotEmpty; |
| import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS; |
| |
| import org.apache.commons.dbcp2.ConnectionFactory; |
| import org.apache.commons.dbcp2.DriverManagerConnectionFactory; |
| import org.apache.commons.dbcp2.PoolableConnectionFactory; |
| import org.apache.commons.dbcp2.PoolingDriver; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.exception.ExceptionUtils; |
| import org.apache.commons.pool2.ObjectPool; |
| import org.apache.commons.pool2.impl.GenericObjectPool; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.alias.CredentialProvider; |
| import org.apache.hadoop.security.alias.CredentialProviderFactory; |
| import org.apache.zeppelin.interpreter.SingleRowInterpreterResult; |
| import org.apache.zeppelin.interpreter.ZeppelinContext; |
| import org.apache.zeppelin.interpreter.util.SqlSplitter; |
| import org.apache.zeppelin.jdbc.hive.HiveUtils; |
| import org.apache.zeppelin.tabledata.TableDataUtils; |
| import org.apache.zeppelin.util.PropertiesUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.ResultSet; |
| import java.sql.ResultSetMetaData; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import org.apache.zeppelin.interpreter.InterpreterContext; |
| import org.apache.zeppelin.interpreter.InterpreterException; |
| import org.apache.zeppelin.interpreter.InterpreterResult; |
| import org.apache.zeppelin.interpreter.InterpreterResult.Code; |
| import org.apache.zeppelin.interpreter.KerberosInterpreter; |
| import org.apache.zeppelin.interpreter.ResultMessages; |
| import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; |
| import org.apache.zeppelin.jdbc.security.JDBCSecurityImpl; |
| import org.apache.zeppelin.scheduler.Scheduler; |
| import org.apache.zeppelin.scheduler.SchedulerFactory; |
| import org.apache.zeppelin.user.UserCredentials; |
| import org.apache.zeppelin.user.UsernamePassword; |
| |
| /** |
| * JDBC interpreter for Zeppelin. This interpreter can also be used for accessing HAWQ, |
| * GreenplumDB, MariaDB, MySQL, Postgres and Redshift. |
| * |
| * <ul> |
| * <li>{@code default.url} - JDBC URL to connect to.</li> |
| * <li>{@code default.user} - JDBC user name..</li> |
| * <li>{@code default.password} - JDBC password..</li> |
| * <li>{@code default.driver.name} - JDBC driver name.</li> |
| * <li>{@code common.max.result} - Max number of SQL result to display.</li> |
| * </ul> |
| * |
| * <p> |
| * How to use: <br/> |
| * {@code %jdbc.sql} <br/> |
| * {@code |
| * SELECT store_id, count(*) |
| * FROM retail_demo.order_lineitems_pxf |
| * GROUP BY store_id; |
| * } |
| * </p> |
| */ |
| public class JDBCInterpreter extends KerberosInterpreter { |
| private static final Logger LOGGER = LoggerFactory.getLogger(JDBCInterpreter.class); |
| |
| static final String COMMON_KEY = "common"; |
| static final String MAX_LINE_KEY = "max_count"; |
| static final int MAX_LINE_DEFAULT = 1000; |
| |
| static final String DEFAULT_KEY = "default"; |
| static final String DRIVER_KEY = "driver"; |
| static final String URL_KEY = "url"; |
| static final String USER_KEY = "user"; |
| static final String PASSWORD_KEY = "password"; |
| static final String PRECODE_KEY = "precode"; |
| static final String STATEMENT_PRECODE_KEY = "statementPrecode"; |
| static final String COMPLETER_SCHEMA_FILTERS_KEY = "completer.schemaFilters"; |
| static final String COMPLETER_TTL_KEY = "completer.ttlInSeconds"; |
| static final String DEFAULT_COMPLETER_TTL = "120"; |
| static final String JDBC_JCEKS_FILE = "jceks.file"; |
| static final String JDBC_JCEKS_CREDENTIAL_KEY = "jceks.credentialKey"; |
| static final String PRECODE_KEY_TEMPLATE = "%s.precode"; |
| static final String STATEMENT_PRECODE_KEY_TEMPLATE = "%s.statementPrecode"; |
| static final String DOT = "."; |
| |
| private static final char WHITESPACE = ' '; |
| private static final char NEWLINE = '\n'; |
| private static final char TAB = '\t'; |
| private static final String TABLE_MAGIC_TAG = "%table "; |
| private static final String EXPLAIN_PREDICATE = "EXPLAIN "; |
| private static final String CANCEL_REASON = "cancel_reason"; |
| |
| static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY; |
| |
| static final String DEFAULT_DRIVER = DEFAULT_KEY + DOT + DRIVER_KEY; |
| static final String DEFAULT_URL = DEFAULT_KEY + DOT + URL_KEY; |
| static final String DEFAULT_USER = DEFAULT_KEY + DOT + USER_KEY; |
| static final String DEFAULT_PASSWORD = DEFAULT_KEY + DOT + PASSWORD_KEY; |
| static final String DEFAULT_PRECODE = DEFAULT_KEY + DOT + PRECODE_KEY; |
| static final String DEFAULT_STATEMENT_PRECODE = DEFAULT_KEY + DOT + STATEMENT_PRECODE_KEY; |
| |
| static final String EMPTY_COLUMN_VALUE = ""; |
| |
| private static final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use"; |
| private static final String CONCURRENT_EXECUTION_COUNT = |
| "zeppelin.jdbc.concurrent.max_connection"; |
| private static final String DBCP_STRING = "jdbc:apache:commons:dbcp:"; |
| private static final String MAX_ROWS_KEY = "zeppelin.jdbc.maxRows"; |
| |
| private static final Set<String> PRESTO_PROPERTIES = new HashSet<>(Arrays.asList( |
| "user", "password", |
| "socksProxy", "httpProxy", "clientTags", "applicationNamePrefix", "accessToken", |
| "SSL", "SSLKeyStorePath", "SSLKeyStorePassword", "SSLTrustStorePath", |
| "SSLTrustStorePassword", "KerberosRemoteServiceName", "KerberosPrincipal", |
| "KerberosUseCanonicalHostname", "KerberosServicePrincipalPattern", |
| "KerberosConfigPath", "KerberosKeytabPath", "KerberosCredentialCachePath", |
| "extraCredentials", "roles", "sessionProperties")); |
| |
| private static final String ALLOW_LOAD_LOCAL_IN_FILE_NAME = "allowLoadLocalInfile"; |
| |
| private static final String AUTO_DESERIALIZE = "autoDeserialize"; |
| |
| private static final String ALLOW_LOCAL_IN_FILE_NAME = "allowLocalInfile"; |
| |
| private static final String ALLOW_URL_IN_LOCAL_IN_FILE_NAME = "allowUrlInLocalInfile"; |
| |
| // database --> Properties |
| private final HashMap<String, Properties> basePropertiesMap; |
| // username --> User Configuration |
| private final HashMap<String, JDBCUserConfigurations> jdbcUserConfigurationsMap; |
| private final HashMap<String, SqlCompleter> sqlCompletersMap; |
| |
| private int maxLineResults; |
| private int maxRows; |
| private SqlSplitter sqlSplitter; |
| |
| private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>(); |
| private Map<String, Boolean> isFirstRefreshMap = new HashMap<>(); |
| private Map<String, Boolean> paragraphCancelMap = new HashMap<>(); |
| |
| public JDBCInterpreter(Properties property) { |
| super(property); |
| jdbcUserConfigurationsMap = new HashMap<>(); |
| basePropertiesMap = new HashMap<>(); |
| sqlCompletersMap = new HashMap<>(); |
| maxLineResults = MAX_LINE_DEFAULT; |
| } |
| |
| @Override |
| public ZeppelinContext getZeppelinContext() { |
| return null; |
| } |
| |
| @Override |
| protected boolean runKerberosLogin() { |
| // Initialize UGI before using |
| Configuration conf = new org.apache.hadoop.conf.Configuration(); |
| conf.set("hadoop.security.authentication", KERBEROS.toString()); |
| UserGroupInformation.setConfiguration(conf); |
| try { |
| if (UserGroupInformation.isLoginKeytabBased()) { |
| LOGGER.debug("Trying relogin from keytab"); |
| UserGroupInformation.getLoginUser().reloginFromKeytab(); |
| return true; |
| } else if (UserGroupInformation.isLoginTicketBased()) { |
| LOGGER.debug("Trying relogin from ticket cache"); |
| UserGroupInformation.getLoginUser().reloginFromTicketCache(); |
| return true; |
| } |
| } catch (Exception e) { |
| LOGGER.error("Unable to run kinit for zeppelin", e); |
| } |
| LOGGER.debug("Neither Keytab nor ticket based login. " + |
| "runKerberosLogin() returning false"); |
| return false; |
| } |
| |
| @Override |
| public void open() { |
| super.open(); |
| for (String propertyKey : properties.stringPropertyNames()) { |
| LOGGER.debug("propertyKey: {}", propertyKey); |
| String[] keyValue = propertyKey.split("\\.", 2); |
| if (2 == keyValue.length) { |
| LOGGER.debug("key: {}, value: {}", keyValue[0], keyValue[1]); |
| |
| Properties prefixProperties; |
| if (basePropertiesMap.containsKey(keyValue[0])) { |
| prefixProperties = basePropertiesMap.get(keyValue[0]); |
| } else { |
| prefixProperties = new Properties(); |
| basePropertiesMap.put(keyValue[0].trim(), prefixProperties); |
| } |
| prefixProperties.put(keyValue[1].trim(), getProperty(propertyKey)); |
| } |
| } |
| |
| Set<String> removeKeySet = new HashSet<>(); |
| for (String key : basePropertiesMap.keySet()) { |
| if (!COMMON_KEY.equals(key)) { |
| Properties properties = basePropertiesMap.get(key); |
| if (!properties.containsKey(DRIVER_KEY) || !properties.containsKey(URL_KEY)) { |
| LOGGER.error("{} will be ignored. {}.{} and {}.{} is mandatory.", |
| key, DRIVER_KEY, key, key, URL_KEY); |
| removeKeySet.add(key); |
| } |
| } |
| } |
| |
| for (String key : removeKeySet) { |
| basePropertiesMap.remove(key); |
| } |
| LOGGER.debug("JDBC PropertiesMap: {}", basePropertiesMap); |
| |
| setMaxLineResults(); |
| setMaxRows(); |
| |
| //TODO(zjffdu) Set different sql splitter for different sql dialects. |
| this.sqlSplitter = new SqlSplitter(); |
| } |
| |
| protected boolean isKerboseEnabled() { |
| if (!isEmpty(getProperty("zeppelin.jdbc.auth.type"))) { |
| UserGroupInformation.AuthenticationMethod authType = JDBCSecurityImpl.getAuthType(properties); |
| if (authType.equals(KERBEROS)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| private void setMaxLineResults() { |
| if (basePropertiesMap.containsKey(COMMON_KEY) && |
| basePropertiesMap.get(COMMON_KEY).containsKey(MAX_LINE_KEY)) { |
| maxLineResults = Integer.valueOf(basePropertiesMap.get(COMMON_KEY).getProperty(MAX_LINE_KEY)); |
| } |
| } |
| |
| /** |
| * Fetch MAX_ROWS_KEYS value from property file and set it to |
| * "maxRows" value. |
| */ |
| private void setMaxRows() { |
| maxRows = Integer.valueOf(getProperty(MAX_ROWS_KEY, "1000")); |
| } |
| |
| private SqlCompleter createOrUpdateSqlCompleter(SqlCompleter sqlCompleter, |
| final Connection connection, String propertyKey, final String buf, final int cursor) { |
| String schemaFiltersKey = String.format("%s.%s", propertyKey, COMPLETER_SCHEMA_FILTERS_KEY); |
| String sqlCompleterTtlKey = String.format("%s.%s", propertyKey, COMPLETER_TTL_KEY); |
| final String schemaFiltersString = getProperty(schemaFiltersKey); |
| int ttlInSeconds = Integer.valueOf( |
| StringUtils.defaultIfEmpty(getProperty(sqlCompleterTtlKey), DEFAULT_COMPLETER_TTL) |
| ); |
| final SqlCompleter completer; |
| if (sqlCompleter == null) { |
| completer = new SqlCompleter(ttlInSeconds); |
| } else { |
| completer = sqlCompleter; |
| } |
| ExecutorService executorService = Executors.newFixedThreadPool(1); |
| executorService.execute(new Runnable() { |
| @Override |
| public void run() { |
| completer.createOrUpdateFromConnection(connection, schemaFiltersString, buf, cursor); |
| } |
| }); |
| |
| executorService.shutdown(); |
| |
| try { |
| // protection to release connection |
| executorService.awaitTermination(3, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| LOGGER.warn("Completion timeout", e); |
| if (connection != null) { |
| try { |
| connection.close(); |
| } catch (SQLException e1) { |
| LOGGER.warn("Error close connection", e1); |
| } |
| } |
| } |
| return completer; |
| } |
| |
| private void initStatementMap() { |
| for (JDBCUserConfigurations configurations : jdbcUserConfigurationsMap.values()) { |
| try { |
| configurations.initStatementMap(); |
| } catch (Exception e) { |
| LOGGER.error("Error while closing paragraphIdStatementMap statement...", e); |
| } |
| } |
| } |
| |
| private void initConnectionPoolMap() { |
| for (String key : jdbcUserConfigurationsMap.keySet()) { |
| try { |
| closeDBPool(key); |
| } catch (SQLException e) { |
| LOGGER.error("Error while closing database pool.", e); |
| } |
| try { |
| JDBCUserConfigurations configurations = jdbcUserConfigurationsMap.get(key); |
| configurations.initConnectionPoolMap(); |
| } catch (SQLException e) { |
| LOGGER.error("Error while closing initConnectionPoolMap.", e); |
| } |
| } |
| } |
| |
| @Override |
| public void close() { |
| super.close(); |
| try { |
| initStatementMap(); |
| initConnectionPoolMap(); |
| } catch (Exception e) { |
| LOGGER.error("Error while closing...", e); |
| } |
| } |
| |
| /* Get user of this sql. |
| * 1. If shiro is enabled, use the login user |
| * 2. Otherwise try to get it from interpreter setting, e.g. default.user |
| */ |
| private String getUser(InterpreterContext context) { |
| String user = context.getAuthenticationInfo().getUser(); |
| |
| if ("anonymous".equalsIgnoreCase(user) && basePropertiesMap.containsKey(DEFAULT_KEY)) { |
| String userInProperty = basePropertiesMap.get(DEFAULT_KEY).getProperty(USER_KEY); |
| if (StringUtils.isNotBlank(userInProperty)) { |
| user = userInProperty; |
| } |
| } |
| return user; |
| } |
| |
| private String getEntityName(String replName, String propertyKey) { |
| if ("jdbc".equals(replName)) { |
| return propertyKey; |
| } else { |
| return replName; |
| } |
| } |
| |
| private String getJDBCDriverName(String user) { |
| StringBuffer driverName = new StringBuffer(); |
| driverName.append(DBCP_STRING); |
| driverName.append(DEFAULT_KEY); |
| driverName.append(user); |
| return driverName.toString(); |
| } |
| |
| private boolean existAccountInBaseProperty(String propertyKey) { |
| return basePropertiesMap.get(propertyKey).containsKey(USER_KEY) && |
| !isEmpty((String) basePropertiesMap.get(propertyKey).get(USER_KEY)) && |
| basePropertiesMap.get(propertyKey).containsKey(PASSWORD_KEY); |
| } |
| |
| private UsernamePassword getUsernamePassword(InterpreterContext interpreterContext, |
| String entity) { |
| UserCredentials uc = interpreterContext.getAuthenticationInfo().getUserCredentials(); |
| if (uc != null) { |
| return uc.getUsernamePassword(entity); |
| } |
| return null; |
| } |
| |
| public JDBCUserConfigurations getJDBCConfiguration(String user) { |
| JDBCUserConfigurations jdbcUserConfigurations = jdbcUserConfigurationsMap.get(user); |
| |
| if (jdbcUserConfigurations == null) { |
| jdbcUserConfigurations = new JDBCUserConfigurations(); |
| jdbcUserConfigurationsMap.put(user, jdbcUserConfigurations); |
| } |
| |
| return jdbcUserConfigurations; |
| } |
| |
| private void closeDBPool(String user) throws SQLException { |
| PoolingDriver poolingDriver = getJDBCConfiguration(user).removeDBDriverPool(); |
| if (poolingDriver != null) { |
| poolingDriver.closePool(DEFAULT_KEY + user); |
| } |
| } |
| |
| private void setUserProperty(InterpreterContext context) |
| throws SQLException, IOException, InterpreterException { |
| |
| String user = getUser(context); |
| JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user); |
| if (basePropertiesMap.get(DEFAULT_KEY).containsKey(USER_KEY) && |
| !basePropertiesMap.get(DEFAULT_KEY).getProperty(USER_KEY).isEmpty()) { |
| String password = getPassword(basePropertiesMap.get(DEFAULT_KEY)); |
| if (!isEmpty(password)) { |
| basePropertiesMap.get(DEFAULT_KEY).setProperty(PASSWORD_KEY, password); |
| } |
| } |
| jdbcUserConfigurations.setProperty(basePropertiesMap.get(DEFAULT_KEY)); |
| if (existAccountInBaseProperty(DEFAULT_KEY)) { |
| return; |
| } |
| |
| UsernamePassword usernamePassword = getUsernamePassword(context, |
| getEntityName(context.getReplName(), DEFAULT_KEY)); |
| if (usernamePassword != null) { |
| jdbcUserConfigurations.cleanUserProperty(); |
| jdbcUserConfigurations.setUserProperty(usernamePassword); |
| } else { |
| closeDBPool(user); |
| } |
| } |
| |
| private void configConnectionPool(GenericObjectPool connectionPool, Properties properties) { |
| boolean testOnBorrow = "true".equalsIgnoreCase(properties.getProperty("testOnBorrow")); |
| boolean testOnCreate = "true".equalsIgnoreCase(properties.getProperty("testOnCreate")); |
| boolean testOnReturn = "true".equalsIgnoreCase(properties.getProperty("testOnReturn")); |
| boolean testWhileIdle = "true".equalsIgnoreCase(properties.getProperty("testWhileIdle")); |
| long timeBetweenEvictionRunsMillis = PropertiesUtil.getLong( |
| properties, "timeBetweenEvictionRunsMillis", -1L); |
| |
| long maxWaitMillis = PropertiesUtil.getLong(properties, "maxWaitMillis", -1L); |
| int maxIdle = PropertiesUtil.getInt(properties, "maxIdle", 8); |
| int minIdle = PropertiesUtil.getInt(properties, "minIdle", 0); |
| int maxTotal = PropertiesUtil.getInt(properties, "maxTotal", -1); |
| |
| connectionPool.setTestOnBorrow(testOnBorrow); |
| connectionPool.setTestOnCreate(testOnCreate); |
| connectionPool.setTestOnReturn(testOnReturn); |
| connectionPool.setTestWhileIdle(testWhileIdle); |
| connectionPool.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis); |
| connectionPool.setMaxIdle(maxIdle); |
| connectionPool.setMinIdle(minIdle); |
| connectionPool.setMaxTotal(maxTotal); |
| connectionPool.setMaxWaitMillis(maxWaitMillis); |
| } |
| |
| private void createConnectionPool(String url, String user, |
| Properties properties) throws SQLException, ClassNotFoundException { |
| |
| LOGGER.info("Creating connection pool for url: {}, user: {}", url, user); |
| |
| /* Remove properties that is not valid properties for presto/trino by checking driver key. |
| * - Presto: com.facebook.presto.jdbc.PrestoDriver |
| * - Trino(ex. PrestoSQL): io.trino.jdbc.TrinoDriver / io.prestosql.jdbc.PrestoDriver |
| */ |
| String driverClass = properties.getProperty(DRIVER_KEY); |
| if (driverClass != null && (driverClass.equals("com.facebook.presto.jdbc.PrestoDriver") |
| || driverClass.equals("io.prestosql.jdbc.PrestoDriver") |
| || driverClass.equals("io.trino.jdbc.TrinoDriver"))) { |
| for (String key : properties.stringPropertyNames()) { |
| if (!PRESTO_PROPERTIES.contains(key)) { |
| properties.remove(key); |
| } |
| } |
| } |
| |
| ConnectionFactory connectionFactory = |
| new DriverManagerConnectionFactory(url, properties); |
| |
| PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory( |
| connectionFactory, null); |
| final String maxConnectionLifetime = |
| StringUtils.defaultIfEmpty(getProperty("zeppelin.jdbc.maxConnLifetime"), "-1"); |
| poolableConnectionFactory.setMaxConnLifetimeMillis(Long.parseLong(maxConnectionLifetime)); |
| poolableConnectionFactory.setValidationQuery( |
| PropertiesUtil.getString(properties, "validationQuery", "show databases")); |
| ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory); |
| this.configConnectionPool((GenericObjectPool) connectionPool, properties); |
| |
| poolableConnectionFactory.setPool(connectionPool); |
| Class.forName(driverClass); |
| PoolingDriver driver = new PoolingDriver(); |
| driver.registerPool(DEFAULT_KEY + user, connectionPool); |
| getJDBCConfiguration(user).saveDBDriverPool(driver); |
| } |
| |
| private Connection getConnectionFromPool(String url, String user, |
| Properties properties) throws SQLException, ClassNotFoundException { |
| String jdbcDriver = getJDBCDriverName(user); |
| |
| if (!getJDBCConfiguration(user).isConnectionInDBDriverPool()) { |
| createConnectionPool(url, user, properties); |
| } |
| return DriverManager.getConnection(jdbcDriver); |
| } |
| |
| public Connection getConnection(InterpreterContext context) |
| throws ClassNotFoundException, SQLException, InterpreterException, IOException { |
| |
| if (basePropertiesMap.get(DEFAULT_KEY) == null) { |
| LOGGER.warn("No default config"); |
| return null; |
| } |
| |
| Connection connection = null; |
| String user = getUser(context); |
| JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(user); |
| setUserProperty(context); |
| |
| final Properties properties = jdbcUserConfigurations.getProperty(); |
| String url = properties.getProperty(URL_KEY); |
| url = appendProxyUserToURL(url, user); |
| String connectionUrl = appendTagsToURL(url, context); |
| validateConnectionUrl(connectionUrl); |
| |
| String authType = getProperty("zeppelin.jdbc.auth.type", "SIMPLE") |
| .trim().toUpperCase(); |
| switch (authType) { |
| case "SIMPLE": |
| connection = getConnectionFromPool(connectionUrl, user, properties); |
| break; |
| case "KERBEROS": |
| LOGGER.debug("Calling createSecureConfiguration(); this will do " + |
| "loginUserFromKeytab() if required"); |
| JDBCSecurityImpl.createSecureConfiguration(getProperties(), |
| UserGroupInformation.AuthenticationMethod.KERBEROS); |
| LOGGER.debug("createSecureConfiguration() returned"); |
| boolean isProxyEnabled = Boolean.parseBoolean( |
| getProperty("zeppelin.jdbc.auth.kerberos.proxy.enable", "true")); |
| if (basePropertiesMap.get(DEFAULT_KEY).containsKey("proxy.user.property") |
| || !isProxyEnabled) { |
| connection = getConnectionFromPool(connectionUrl, user, properties); |
| } else { |
| UserGroupInformation ugi = null; |
| try { |
| ugi = UserGroupInformation.createProxyUser( |
| user, UserGroupInformation.getCurrentUser()); |
| } catch (Exception e) { |
| LOGGER.error("Error in getCurrentUser", e); |
| throw new InterpreterException("Error in getCurrentUser", e); |
| } |
| |
| final String finalUser = user; |
| try { |
| connection = ugi.doAs((PrivilegedExceptionAction<Connection>) () -> |
| getConnectionFromPool(connectionUrl, finalUser, properties)); |
| } catch (Exception e) { |
| LOGGER.error("Error in doAs", e); |
| throw new InterpreterException("Error in doAs", e); |
| } |
| } |
| break; |
| } |
| |
| return connection; |
| } |
| |
| private void validateConnectionUrl(String url) { |
| if (containsIgnoreCase(url, ALLOW_LOAD_LOCAL_IN_FILE_NAME) || |
| containsIgnoreCase(url, AUTO_DESERIALIZE) || |
| containsIgnoreCase(url, ALLOW_LOCAL_IN_FILE_NAME) || |
| containsIgnoreCase(url, ALLOW_URL_IN_LOCAL_IN_FILE_NAME)) { |
| throw new IllegalArgumentException("Connection URL contains sensitive configuration"); |
| } |
| } |
| |
| private String appendProxyUserToURL(String url, String user) { |
| StringBuilder connectionUrl = new StringBuilder(url); |
| |
| if (user != null && !user.equals("anonymous") && |
| basePropertiesMap.get(DEFAULT_KEY).containsKey("proxy.user.property")) { |
| |
| Integer lastIndexOfUrl = connectionUrl.indexOf("?"); |
| if (lastIndexOfUrl == -1) { |
| lastIndexOfUrl = connectionUrl.length(); |
| } |
| LOGGER.info("Using proxy user as: {}", user); |
| LOGGER.info("Using proxy property for user as: {}", |
| basePropertiesMap.get(DEFAULT_KEY).getProperty("proxy.user.property")); |
| connectionUrl.insert(lastIndexOfUrl, ";" + |
| basePropertiesMap.get(DEFAULT_KEY).getProperty("proxy.user.property") + "=" + user + ";"); |
| } else if (user != null && !user.equals("anonymous") && url.contains("hive")) { |
| LOGGER.warn("User impersonation for hive has changed please refer: http://zeppelin.apache" + |
| ".org/docs/latest/interpreter/jdbc.html#apache-hive"); |
| } |
| |
| return connectionUrl.toString(); |
| } |
| |
| // only add tags for hive jdbc |
| private String appendTagsToURL(String url, InterpreterContext context) { |
| if (!Boolean.parseBoolean(getProperty("zeppelin.jdbc.hive.engines.tag.enable", "true"))) { |
| return url; |
| } |
| |
| StringBuilder builder = new StringBuilder(url); |
| if (url.startsWith("jdbc:hive2:")) { |
| Integer lastIndexOfQMark = builder.indexOf("?"); |
| if (lastIndexOfQMark == -1) { |
| builder.append("?"); |
| lastIndexOfQMark = builder.length(); |
| } else { |
| lastIndexOfQMark++; |
| } |
| builder.insert(lastIndexOfQMark, "mapreduce.job.tags=" + context.getParagraphId() + ";"); |
| builder.insert(lastIndexOfQMark, "tez.application.tags=" + context.getParagraphId() + ";"); |
| } |
| return builder.toString(); |
| } |
| |
| |
| private String getPassword(Properties properties) throws IOException, InterpreterException { |
| if (isNotEmpty(properties.getProperty(PASSWORD_KEY))) { |
| return properties.getProperty(PASSWORD_KEY); |
| } else if (isNotEmpty(properties.getProperty(JDBC_JCEKS_FILE)) |
| && isNotEmpty(properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY))) { |
| try { |
| Configuration configuration = new Configuration(); |
| configuration.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, |
| properties.getProperty(JDBC_JCEKS_FILE)); |
| CredentialProvider provider = CredentialProviderFactory.getProviders(configuration).get(0); |
| CredentialProvider.CredentialEntry credEntry = |
| provider.getCredentialEntry(properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY)); |
| if (credEntry != null) { |
| return new String(credEntry.getCredential()); |
| } else { |
| throw new InterpreterException("Failed to retrieve password from JCEKS from key: " |
| + properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY)); |
| } |
| } catch (Exception e) { |
| LOGGER.error("Failed to retrieve password from JCEKS \n" + |
| "For file: {} \nFor key: {}", properties.getProperty(JDBC_JCEKS_FILE), |
| properties.getProperty(JDBC_JCEKS_CREDENTIAL_KEY), e); |
| throw e; |
| } |
| } |
| return null; |
| } |
| |
| private String getResults(ResultSet resultSet, boolean isTableType) |
| throws SQLException { |
| |
| ResultSetMetaData md = resultSet.getMetaData(); |
| StringBuilder msg; |
| if (isTableType) { |
| msg = new StringBuilder(TABLE_MAGIC_TAG); |
| } else { |
| msg = new StringBuilder(); |
| } |
| |
| for (int i = 1; i < md.getColumnCount() + 1; i++) { |
| if (i > 1) { |
| msg.append(TAB); |
| } |
| if (StringUtils.isNotEmpty(md.getColumnLabel(i))) { |
| msg.append(removeTablePrefix(replaceReservedChars( |
| TableDataUtils.normalizeColumn(md.getColumnLabel(i))))); |
| } else { |
| msg.append(removeTablePrefix(replaceReservedChars( |
| TableDataUtils.normalizeColumn(md.getColumnName(i))))); |
| } |
| } |
| msg.append(NEWLINE); |
| |
| int displayRowCount = 0; |
| boolean truncate = false; |
| while (resultSet.next()) { |
| if (displayRowCount >= getMaxResult()) { |
| truncate = true; |
| break; |
| } |
| for (int i = 1; i < md.getColumnCount() + 1; i++) { |
| Object resultObject; |
| String resultValue; |
| resultObject = resultSet.getObject(i); |
| if (resultObject == null) { |
| resultValue = "null"; |
| } else { |
| resultValue = resultSet.getString(i); |
| } |
| msg.append(replaceReservedChars(TableDataUtils.normalizeColumn(resultValue))); |
| if (i != md.getColumnCount()) { |
| msg.append(TAB); |
| } |
| } |
| msg.append(NEWLINE); |
| displayRowCount++; |
| } |
| |
| if (truncate) { |
| msg.append("\n" + ResultMessages.getExceedsLimitRowsMessage(getMaxResult(), |
| String.format("%s.%s", COMMON_KEY, MAX_LINE_KEY)).toString()); |
| } |
| return msg.toString(); |
| } |
| |
| private boolean isDDLCommand(int updatedCount, int columnCount) throws SQLException { |
| return updatedCount < 0 && columnCount <= 0 ? true : false; |
| } |
| |
| public InterpreterResult executePrecode(InterpreterContext interpreterContext) |
| throws InterpreterException { |
| InterpreterResult interpreterResult = null; |
| for (String propertyKey : basePropertiesMap.keySet()) { |
| String precode = getProperty(String.format("%s.precode", propertyKey)); |
| if (StringUtils.isNotBlank(precode)) { |
| interpreterResult = executeSql(precode, interpreterContext); |
| if (interpreterResult.code() != Code.SUCCESS) { |
| break; |
| } |
| } |
| } |
| |
| return interpreterResult; |
| } |
| |
| //Just keep it for testing |
| protected List<String> splitSqlQueries(String text) { |
| return sqlSplitter.splitSql(text); |
| } |
| |
| /** |
| * Execute the sql statement. |
| * |
| * @param sql |
| * @param context |
| * @return |
| * @throws InterpreterException |
| */ |
| private InterpreterResult executeSql(String sql, |
| InterpreterContext context) throws InterpreterException { |
| Connection connection = null; |
| Statement statement; |
| ResultSet resultSet = null; |
| String paragraphId = context.getParagraphId(); |
| String noteId = context.getNoteId(); |
| String user = getUser(context); |
| |
| try { |
| connection = getConnection(context); |
| } catch (IllegalArgumentException e) { |
| LOGGER.error("Cannot run " + sql, e); |
| return new InterpreterResult(Code.ERROR, "Connection URL contains improper configuration"); |
| } catch (Exception e) { |
| LOGGER.error("Fail to getConnection", e); |
| try { |
| closeDBPool(user); |
| } catch (SQLException e1) { |
| LOGGER.error("Cannot close DBPool for user: " + user , e1); |
| } |
| if (e instanceof SQLException) { |
| return new InterpreterResult(Code.ERROR, e.getMessage()); |
| } else { |
| return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); |
| } |
| } |
| if (connection == null) { |
| return new InterpreterResult(Code.ERROR, "User's connection not found."); |
| } |
| |
| try { |
| List<String> sqlArray = sqlSplitter.splitSql(sql); |
| for (String sqlToExecute : sqlArray) { |
| String sqlTrimmedLowerCase = sqlToExecute.trim().toLowerCase(); |
| if (sqlTrimmedLowerCase.startsWith("set ") || |
| sqlTrimmedLowerCase.startsWith("list ") || |
| sqlTrimmedLowerCase.startsWith("add ") || |
| sqlTrimmedLowerCase.startsWith("delete ")) { |
| // some version of hive doesn't work with set statement with empty line ahead. |
| // so we need to trim it first in this case. |
| sqlToExecute = sqlToExecute.trim(); |
| } |
| LOGGER.info("[{}|{}|{}] Execute sql: {}", user, noteId, paragraphId, sqlToExecute); |
| statement = connection.createStatement(); |
| |
| // fetch n+1 rows in order to indicate there's more rows available (for large selects) |
| statement.setFetchSize(context.getIntLocalProperty("limit", getMaxResult())); |
| statement.setMaxRows(context.getIntLocalProperty("limit", maxRows)); |
| |
| if (statement == null) { |
| return new InterpreterResult(Code.ERROR, "Prefix not found."); |
| } |
| |
| try { |
| getJDBCConfiguration(user).saveStatement(paragraphId, statement); |
| |
| String statementPrecode = |
| getProperty(String.format(STATEMENT_PRECODE_KEY_TEMPLATE, DEFAULT_KEY)); |
| |
| if (StringUtils.isNotBlank(statementPrecode)) { |
| statement.execute(statementPrecode); |
| } |
| |
| // start hive monitor thread if it is hive jdbc |
| String jdbcURL = getJDBCConfiguration(user).getProperty().getProperty(URL_KEY); |
| String driver = |
| getJDBCConfiguration(user).getProperty().getProperty(DRIVER_KEY); |
| if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://") |
| && driver != null && driver.equals("org.apache.hive.jdbc.HiveDriver")) { |
| HiveUtils.startHiveMonitorThread(statement, context, |
| Boolean.parseBoolean(getProperty("hive.log.display", "true")), this); |
| } |
| boolean isResultSetAvailable = statement.execute(sqlToExecute); |
| getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(); |
| if (isResultSetAvailable) { |
| resultSet = statement.getResultSet(); |
| |
| // Regards that the command is DDL. |
| if (isDDLCommand(statement.getUpdateCount(), |
| resultSet.getMetaData().getColumnCount())) { |
| context.out.write("%text Query executed successfully.\n"); |
| } else { |
| String template = context.getLocalProperties().get("template"); |
| if (!StringUtils.isBlank(template)) { |
| resultSet.next(); |
| SingleRowInterpreterResult singleRowResult = |
| new SingleRowInterpreterResult(getFirstRow(resultSet), template, context); |
| |
| if (isFirstRefreshMap.get(context.getParagraphId())) { |
| context.out.write(singleRowResult.toAngular()); |
| context.out.write("\n%text "); |
| context.out.flush(); |
| isFirstRefreshMap.put(context.getParagraphId(), false); |
| } |
| singleRowResult.pushAngularObjects(); |
| |
| } else { |
| String results = getResults(resultSet, |
| !containsIgnoreCase(sqlToExecute, EXPLAIN_PREDICATE)); |
| context.out.write(results); |
| context.out.write("\n%text "); |
| context.out.flush(); |
| } |
| } |
| } else { |
| // Response contains either an update count or there are no results. |
| int updateCount = statement.getUpdateCount(); |
| context.out.write("\n%text " + |
| "Query executed successfully. Affected rows : " + |
| updateCount + "\n"); |
| } |
| } finally { |
| if (resultSet != null) { |
| try { |
| resultSet.close(); |
| } catch (SQLException e) { /*ignored*/ } |
| } |
| if (statement != null) { |
| try { |
| statement.close(); |
| } catch (SQLException e) { /*ignored*/ } |
| } |
| } |
| } |
| } catch (Throwable e) { |
| LOGGER.error("Cannot run " + sql, e); |
| if (e instanceof SQLException) { |
| return new InterpreterResult(Code.ERROR, e.getMessage()); |
| } else { |
| return new InterpreterResult(Code.ERROR, ExceptionUtils.getStackTrace(e)); |
| } |
| } finally { |
| //In case user ran an insert/update/upsert statement |
| if (connection != null) { |
| try { |
| if (!connection.getAutoCommit()) { |
| connection.commit(); |
| } |
| connection.close(); |
| } catch (SQLException e) { /*ignored*/ } |
| } |
| getJDBCConfiguration(user).removeStatement(paragraphId); |
| } |
| |
| return new InterpreterResult(Code.SUCCESS); |
| } |
| |
| private List getFirstRow(ResultSet rs) throws SQLException { |
| List list = new ArrayList(); |
| ResultSetMetaData md = rs.getMetaData(); |
| for (int i = 1; i <= md.getColumnCount(); ++i) { |
| Object columnObject = rs.getObject(i); |
| String columnValue = null; |
| if (columnObject == null) { |
| columnValue = "null"; |
| } else { |
| columnValue = rs.getString(i); |
| } |
| list.add(columnValue); |
| } |
| return list; |
| } |
| |
| /** |
| * For %table response replace Tab and Newline characters from the content. |
| */ |
| private String replaceReservedChars(String str) { |
| if (str == null) { |
| return EMPTY_COLUMN_VALUE; |
| } |
| return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE); |
| } |
| |
| /** |
| * Hive will prefix table name before the column |
| * @param columnName |
| * @return |
| */ |
| private String removeTablePrefix(String columnName) { |
| int index = columnName.indexOf("."); |
| if (index > 0) { |
| return columnName.substring(index + 1); |
| } else { |
| return columnName; |
| } |
| } |
| |
| @Override |
| protected boolean isInterpolate() { |
| return Boolean.parseBoolean(getProperty("zeppelin.jdbc.interpolation", "false")); |
| } |
| |
| private boolean isRefreshMode(InterpreterContext context) { |
| return context.getLocalProperties().get("refreshInterval") != null; |
| } |
| |
| @Override |
| public InterpreterResult internalInterpret(String cmd, InterpreterContext context) |
| throws InterpreterException { |
| String dbprefix = getDBPrefix(context); |
| if (!StringUtils.equals(dbprefix, DEFAULT_KEY)) { |
| LOGGER.warn("DBprefix like %jdbc(db=mysql) or %jdbc(mysql) is not supported anymore!"); |
| LOGGER.warn("JDBC Interpreter would try to use default config."); |
| } |
| LOGGER.debug("Run SQL command '{}'", cmd); |
| if (!isRefreshMode(context)) { |
| return executeSql(cmd, context); |
| } else { |
| int refreshInterval = Integer.parseInt(context.getLocalProperties().get("refreshInterval")); |
| paragraphCancelMap.put(context.getParagraphId(), false); |
| ScheduledExecutorService refreshExecutor = Executors.newSingleThreadScheduledExecutor(); |
| refreshExecutorServices.put(context.getParagraphId(), refreshExecutor); |
| isFirstRefreshMap.put(context.getParagraphId(), true); |
| final AtomicReference<InterpreterResult> interpreterResultRef = new AtomicReference(); |
| refreshExecutor.scheduleAtFixedRate(() -> { |
| context.out.clear(false); |
| try { |
| InterpreterResult result = executeSql(cmd, context); |
| context.out.flush(); |
| interpreterResultRef.set(result); |
| if (result.code() != Code.SUCCESS) { |
| refreshExecutor.shutdownNow(); |
| } |
| } catch (Exception e) { |
| LOGGER.warn("Fail to run sql", e); |
| } |
| }, 0, refreshInterval, TimeUnit.MILLISECONDS); |
| |
| while (!refreshExecutor.isTerminated()) { |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| LOGGER.error(""); |
| } |
| } |
| refreshExecutorServices.remove(context.getParagraphId()); |
| if (paragraphCancelMap.getOrDefault(context.getParagraphId(), false)) { |
| return new InterpreterResult(Code.ERROR); |
| } else if (interpreterResultRef.get().code() == Code.ERROR) { |
| return interpreterResultRef.get(); |
| } else { |
| return new InterpreterResult(Code.SUCCESS); |
| } |
| } |
| } |
| |
| @Override |
| public void cancel(InterpreterContext context) { |
| |
| if (isRefreshMode(context)) { |
| LOGGER.info("Shutdown refreshExecutorService for paragraph: {}", context.getParagraphId()); |
| ScheduledExecutorService executorService = |
| refreshExecutorServices.get(context.getParagraphId()); |
| if (executorService != null) { |
| executorService.shutdownNow(); |
| } |
| paragraphCancelMap.put(context.getParagraphId(), true); |
| return; |
| } |
| |
| LOGGER.info("Cancel current query statement."); |
| String paragraphId = context.getParagraphId(); |
| JDBCUserConfigurations jdbcUserConfigurations = getJDBCConfiguration(getUser(context)); |
| try { |
| jdbcUserConfigurations.cancelStatement(paragraphId); |
| } catch (SQLException e) { |
| LOGGER.error("Error while cancelling...", e); |
| } |
| |
| String cancelReason = context.getLocalProperties().get(CANCEL_REASON); |
| if (StringUtils.isNotBlank(cancelReason)) { |
| try { |
| context.out.write(cancelReason); |
| } catch (IOException e) { |
| LOGGER.error("Fail to write cancel reason"); |
| } |
| } |
| } |
| |
| public void cancel(InterpreterContext context, String errorMessage) { |
| context.getLocalProperties().put(CANCEL_REASON, errorMessage); |
| cancel(context); |
| } |
| /** |
| * |
| * |
| * @param context |
| * @return |
| */ |
| public String getDBPrefix(InterpreterContext context) { |
| Map<String, String> localProperties = context.getLocalProperties(); |
| // It is recommended to use this kind of format: %jdbc(db=mysql) |
| if (localProperties.containsKey("db")) { |
| return localProperties.get("db"); |
| } |
| // %jdbc(mysql) is only for backward compatibility |
| for (Map.Entry<String, String> entry : localProperties.entrySet()) { |
| if (entry.getKey().equals(entry.getValue())) { |
| return entry.getKey(); |
| } |
| } |
| return DEFAULT_KEY; |
| } |
| |
| @Override |
| public FormType getFormType() { |
| return FormType.SIMPLE; |
| } |
| |
| @Override |
| public int getProgress(InterpreterContext context) { |
| return 0; |
| } |
| |
| @Override |
| public Scheduler getScheduler() { |
| String schedulerName = JDBCInterpreter.class.getName() + this.hashCode(); |
| return isConcurrentExecution() ? |
| SchedulerFactory.singleton().createOrGetParallelScheduler(schedulerName, |
| getMaxConcurrentConnection()) |
| : SchedulerFactory.singleton().createOrGetFIFOScheduler(schedulerName); |
| } |
| |
| @Override |
| public List<InterpreterCompletion> completion(String buf, int cursor, |
| InterpreterContext context) throws InterpreterException { |
| List<InterpreterCompletion> candidates = new ArrayList<>(); |
| String sqlCompleterKey = |
| String.format("%s.%s", getUser(context), DEFAULT_KEY); |
| SqlCompleter sqlCompleter = sqlCompletersMap.get(sqlCompleterKey); |
| |
| Connection connection = null; |
| try { |
| if (context != null) { |
| connection = getConnection(context); |
| } |
| } catch (ClassNotFoundException | SQLException | IOException e) { |
| LOGGER.warn("SQLCompleter will created without use connection"); |
| } |
| |
| sqlCompleter = createOrUpdateSqlCompleter(sqlCompleter, connection, DEFAULT_KEY, buf, cursor); |
| sqlCompletersMap.put(sqlCompleterKey, sqlCompleter); |
| sqlCompleter.complete(buf, cursor, candidates); |
| |
| return candidates; |
| } |
| |
| public int getMaxResult() { |
| return maxLineResults; |
| } |
| |
| boolean isConcurrentExecution() { |
| return Boolean.valueOf(getProperty(CONCURRENT_EXECUTION_KEY)); |
| } |
| |
| int getMaxConcurrentConnection() { |
| try { |
| return Integer.valueOf(getProperty(CONCURRENT_EXECUTION_COUNT)); |
| } catch (Exception e) { |
| LOGGER.error("Fail to parse {} with value: {}", CONCURRENT_EXECUTION_COUNT, |
| getProperty(CONCURRENT_EXECUTION_COUNT)); |
| return 10; |
| } |
| } |
| } |