blob: a7951de010649d7a500a9505ecf530392cf28505 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.snowflake.service;
import net.snowflake.client.core.SFSessionProperty;
import net.snowflake.client.jdbc.SnowflakeDriver;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.dbcp.AbstractDBCPConnectionPool;
import org.apache.nifi.dbcp.utils.DBCPProperties;
import org.apache.nifi.dbcp.utils.DataSourceConfiguration;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.snowflake.SnowflakeConnectionProviderService;
import org.apache.nifi.processors.snowflake.SnowflakeConnectionWrapper;
import org.apache.nifi.processors.snowflake.util.SnowflakeProperties;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxyConfigurationService;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormat;
import org.apache.nifi.snowflake.service.util.ConnectionUrlFormatParameters;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_PASSWORD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.DB_USER;
import static org.apache.nifi.dbcp.utils.DBCPProperties.EVICTION_RUN_PERIOD;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_CONN_LIFETIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_IDLE;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_TOTAL_CONNECTIONS;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MAX_WAIT_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.MIN_IDLE;
import static org.apache.nifi.dbcp.utils.DBCPProperties.SOFT_MIN_EVICTABLE_IDLE_TIME;
import static org.apache.nifi.dbcp.utils.DBCPProperties.VALIDATION_QUERY;
import static org.apache.nifi.dbcp.utils.DBCPProperties.extractMillisWithInfinite;
/**
* Implementation of Database Connection Pooling Service for Snowflake. Apache DBCP is used for connection pooling
* functionality.
*/
@Tags({"snowflake", "dbcp", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Snowflake Connection Pooling Service. Connections can be asked from pool and returned after usage.")
@SupportsSensitiveDynamicProperties
@DynamicProperties({
@DynamicProperty(name = "JDBC property name",
value = "Snowflake JDBC property value",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT,
description = "Snowflake JDBC driver property name and value applied to JDBC connections.")
})
@RequiresInstanceClassLoading
public class SnowflakeComputingConnectionPool extends AbstractDBCPConnectionPool implements SnowflakeConnectionProviderService {
public static final PropertyDescriptor CONNECTION_URL_FORMAT = new PropertyDescriptor.Builder()
.name("connection-url-format")
.displayName("Connection URL Format")
.description("The format of the connection URL.")
.allowableValues(ConnectionUrlFormat.class)
.required(true)
.defaultValue(ConnectionUrlFormat.FULL_URL)
.build();
public static final PropertyDescriptor SNOWFLAKE_URL = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPProperties.DATABASE_URL)
.displayName("Snowflake URL")
.description("Example connection string: jdbc:snowflake://[account].[region]" + ConnectionUrlFormat.SNOWFLAKE_HOST_SUFFIX + "/?[connection_params]" +
" The connection parameters can include db=DATABASE_NAME to avoid using qualified table names such as DATABASE_NAME.PUBLIC.TABLE_NAME")
.required(true)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.FULL_URL)
.build();
public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_LOCATOR = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_LOCATOR)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_CLOUD_REGION = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_REGION)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_CLOUD_TYPE = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.CLOUD_TYPE)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_LOCATOR)
.build();
public static final PropertyDescriptor SNOWFLAKE_ORGANIZATION_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ORGANIZATION_NAME)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor SNOWFLAKE_ACCOUNT_NAME = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(SnowflakeProperties.ACCOUNT_NAME)
.dependsOn(CONNECTION_URL_FORMAT, ConnectionUrlFormat.ACCOUNT_NAME)
.build();
public static final PropertyDescriptor SNOWFLAKE_USER = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPProperties.DB_USER)
.displayName("Username")
.description("The Snowflake user name.")
.build();
public static final PropertyDescriptor SNOWFLAKE_PASSWORD = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(DBCPProperties.DB_PASSWORD)
.displayName("Password")
.description("The password for the Snowflake user.")
.build();
public static final PropertyDescriptor SNOWFLAKE_WAREHOUSE = new PropertyDescriptor.Builder()
.name("warehouse")
.displayName("Warehouse")
.description("The warehouse to use by default. The same as passing 'warehouse=WAREHOUSE' to the connection string.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
CONNECTION_URL_FORMAT,
SNOWFLAKE_URL,
SNOWFLAKE_ACCOUNT_LOCATOR,
SNOWFLAKE_CLOUD_REGION,
SNOWFLAKE_CLOUD_TYPE,
SNOWFLAKE_ORGANIZATION_NAME,
SNOWFLAKE_ACCOUNT_NAME,
SNOWFLAKE_USER,
SNOWFLAKE_PASSWORD,
SnowflakeProperties.DATABASE,
SnowflakeProperties.SCHEMA,
SNOWFLAKE_WAREHOUSE,
ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE,
VALIDATION_QUERY,
MAX_WAIT_TIME,
MAX_TOTAL_CONNECTIONS,
MIN_IDLE,
MAX_IDLE,
MAX_CONN_LIFETIME,
EVICTION_RUN_PERIOD,
MIN_EVICTABLE_IDLE_TIME,
SOFT_MIN_EVICTABLE_IDLE_TIME
);
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR);
return builder.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
return Collections.emptyList();
}
@Override
protected DataSourceConfiguration getDataSourceConfiguration(final ConfigurationContext context) {
final String url = getUrl(context);
final String driverName = SnowflakeDriver.class.getName();
final String user = context.getProperty(DB_USER).evaluateAttributeExpressions().getValue();
final String password = context.getProperty(DB_PASSWORD).evaluateAttributeExpressions().getValue();
final Integer maxTotal = context.getProperty(MAX_TOTAL_CONNECTIONS).evaluateAttributeExpressions().asInteger();
final String validationQuery = context.getProperty(VALIDATION_QUERY).evaluateAttributeExpressions().getValue();
final Long maxWaitMillis = extractMillisWithInfinite(context.getProperty(MAX_WAIT_TIME).evaluateAttributeExpressions());
final Integer minIdle = context.getProperty(MIN_IDLE).evaluateAttributeExpressions().asInteger();
final Integer maxIdle = context.getProperty(MAX_IDLE).evaluateAttributeExpressions().asInteger();
final Long maxConnLifetimeMillis = extractMillisWithInfinite(context.getProperty(MAX_CONN_LIFETIME).evaluateAttributeExpressions());
final Long timeBetweenEvictionRunsMillis = extractMillisWithInfinite(context.getProperty(EVICTION_RUN_PERIOD).evaluateAttributeExpressions());
final Long minEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
final Long softMinEvictableIdleTimeMillis = extractMillisWithInfinite(context.getProperty(SOFT_MIN_EVICTABLE_IDLE_TIME).evaluateAttributeExpressions());
return new DataSourceConfiguration.Builder(url, driverName, user, password)
.maxTotal(maxTotal)
.validationQuery(validationQuery)
.maxWaitMillis(maxWaitMillis)
.minIdle(minIdle)
.maxIdle(maxIdle)
.maxConnLifetimeMillis(maxConnLifetimeMillis)
.timeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis)
.minEvictableIdleTimeMillis(minEvictableIdleTimeMillis)
.softMinEvictableIdleTimeMillis(softMinEvictableIdleTimeMillis)
.build();
}
protected String getUrl(final ConfigurationContext context) {
final ConnectionUrlFormat connectionUrlFormat = context.getProperty(CONNECTION_URL_FORMAT).asAllowableValue(ConnectionUrlFormat.class);
final ConnectionUrlFormatParameters parameters = getConnectionUrlFormatParameters(context);
return connectionUrlFormat.buildConnectionUrl(parameters);
}
@Override
protected Driver getDriver(final String driverName, final String url) {
try {
Class.forName(driverName);
return DriverManager.getDriver(url);
} catch (Exception e) {
throw new ProcessException("Snowflake driver unavailable or incompatible connection URL", e);
}
}
@Override
protected Map<String, String> getConnectionProperties(final ConfigurationContext context) {
final String database = context.getProperty(SnowflakeProperties.DATABASE).evaluateAttributeExpressions().getValue();
final String schema = context.getProperty(SnowflakeProperties.SCHEMA).evaluateAttributeExpressions().getValue();
final String warehouse = context.getProperty(SNOWFLAKE_WAREHOUSE).evaluateAttributeExpressions().getValue();
final Map<String, String> connectionProperties = super.getConnectionProperties(context);
if (database != null) {
connectionProperties.put("db", database);
}
if (schema != null) {
connectionProperties.put("schema", schema);
}
if (warehouse != null) {
connectionProperties.put("warehouse", warehouse);
}
final ProxyConfigurationService proxyConfigurationService = context
.getProperty(ProxyConfigurationService.PROXY_CONFIGURATION_SERVICE)
.asControllerService(ProxyConfigurationService.class);
if (proxyConfigurationService != null) {
final ProxyConfiguration proxyConfiguration = proxyConfigurationService.getConfiguration();
connectionProperties.put(SFSessionProperty.USE_PROXY.getPropertyKey(), "true");
if (proxyConfiguration.getProxyServerHost() != null) {
connectionProperties.put(SFSessionProperty.PROXY_HOST.getPropertyKey(), proxyConfiguration.getProxyServerHost());
}
if (proxyConfiguration.getProxyServerPort() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PORT.getPropertyKey(), proxyConfiguration.getProxyServerPort().toString());
}
if (proxyConfiguration.getProxyUserName() != null) {
connectionProperties.put(SFSessionProperty.PROXY_USER.getPropertyKey(), proxyConfiguration.getProxyUserName());
}
if (proxyConfiguration.getProxyUserPassword() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PASSWORD.getPropertyKey(), proxyConfiguration.getProxyUserPassword());
}
if (proxyConfiguration.getProxyType() != null) {
connectionProperties.put(SFSessionProperty.PROXY_PROTOCOL.getPropertyKey(), proxyConfiguration.getProxyType().name().toLowerCase());
}
}
return connectionProperties;
}
@Override
public SnowflakeConnectionWrapper getSnowflakeConnection() {
return new SnowflakeConnectionWrapper(getConnection());
}
private ConnectionUrlFormatParameters getConnectionUrlFormatParameters(ConfigurationContext context) {
return new ConnectionUrlFormatParameters(
context.getProperty(SNOWFLAKE_URL).evaluateAttributeExpressions().getValue(),
context.getProperty(SNOWFLAKE_ORGANIZATION_NAME).evaluateAttributeExpressions().getValue(),
context.getProperty(SNOWFLAKE_ACCOUNT_NAME).evaluateAttributeExpressions().getValue(),
context.getProperty(SNOWFLAKE_ACCOUNT_LOCATOR).evaluateAttributeExpressions().getValue(),
context.getProperty(SNOWFLAKE_CLOUD_REGION).evaluateAttributeExpressions().getValue(),
context.getProperty(SNOWFLAKE_CLOUD_TYPE).evaluateAttributeExpressions().getValue()
);
}
}