blob: df91208a6c25306cc9a56b0d6ff9aafc09068522 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata.storage.mysql;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import com.mysql.jdbc.exceptions.MySQLTransientException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.util.StringMapper;
import java.io.File;
import java.sql.SQLException;
public class MySQLConnector extends SQLMetadataConnector
{
private static final Logger log = new Logger(MySQLConnector.class);
private static final String PAYLOAD_TYPE = "LONGBLOB";
private static final String SERIAL_TYPE = "BIGINT(20) AUTO_INCREMENT";
private static final String QUOTE_STRING = "`";
private static final String COLLATION = "CHARACTER SET utf8mb4 COLLATE utf8mb4_bin";
private final DBI dbi;
@Inject
public MySQLConnector(
Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> dbTables,
MySQLConnectorSslConfig connectorSslConfig,
MySQLConnectorDriverConfig driverConfig
)
{
super(config, dbTables);
try {
Class.forName(driverConfig.getDriverClassName(), false, getClass().getClassLoader());
}
catch (ClassNotFoundException e) {
throw new ISE(e, "Could not find %s on the classpath. The MySQL Connector library is not included in the Druid "
+ "distribution but is required to use MySQL. Please download a compatible library (for example "
+ "'mysql-connector-java-5.1.48.jar') and place it under 'extensions/mysql-metadata-storage/'. See "
+ "https://druid.apache.org/downloads for more details.",
driverConfig.getDriverClassName()
);
}
final BasicDataSource datasource = getDatasource();
// MySQL driver is classloader isolated as part of the extension
// so we need to help JDBC find the driver
datasource.setDriverClassLoader(getClass().getClassLoader());
datasource.setDriverClassName(driverConfig.getDriverClassName());
datasource.addConnectionProperty("useSSL", String.valueOf(connectorSslConfig.isUseSSL()));
if (connectorSslConfig.isUseSSL()) {
log.info("SSL is enabled on this MySQL connection. ");
datasource.addConnectionProperty(
"verifyServerCertificate",
String.valueOf(connectorSslConfig.isVerifyServerCertificate())
);
if (connectorSslConfig.isVerifyServerCertificate()) {
log.info("Server certificate verification is enabled. ");
if (connectorSslConfig.getTrustCertificateKeyStoreUrl() != null) {
datasource.addConnectionProperty(
"trustCertificateKeyStoreUrl",
new File(connectorSslConfig.getTrustCertificateKeyStoreUrl()).toURI().toString()
);
}
if (connectorSslConfig.getTrustCertificateKeyStoreType() != null) {
datasource.addConnectionProperty(
"trustCertificateKeyStoreType",
connectorSslConfig.getTrustCertificateKeyStoreType()
);
}
if (connectorSslConfig.getTrustCertificateKeyStorePassword() == null) {
log.warn(
"Trust store password is empty. Ensure that the trust store has been configured with an empty password.");
} else {
datasource.addConnectionProperty(
"trustCertificateKeyStorePassword",
connectorSslConfig.getTrustCertificateKeyStorePassword()
);
}
}
if (connectorSslConfig.getClientCertificateKeyStoreUrl() != null) {
datasource.addConnectionProperty(
"clientCertificateKeyStoreUrl",
new File(connectorSslConfig.getClientCertificateKeyStoreUrl()).toURI().toString()
);
}
if (connectorSslConfig.getClientCertificateKeyStoreType() != null) {
datasource.addConnectionProperty(
"clientCertificateKeyStoreType",
connectorSslConfig.getClientCertificateKeyStoreType()
);
}
if (connectorSslConfig.getClientCertificateKeyStorePassword() != null) {
datasource.addConnectionProperty(
"clientCertificateKeyStorePassword",
connectorSslConfig.getClientCertificateKeyStorePassword()
);
}
Joiner joiner = Joiner.on(",").skipNulls();
if (connectorSslConfig.getEnabledSSLCipherSuites() != null) {
datasource.addConnectionProperty(
"enabledSSLCipherSuites",
joiner.join(connectorSslConfig.getEnabledSSLCipherSuites())
);
}
if (connectorSslConfig.getEnabledTLSProtocols() != null) {
datasource.addConnectionProperty("enabledTLSProtocols", joiner.join(connectorSslConfig.getEnabledTLSProtocols()));
}
}
// use double-quotes for quoting columns, so we can write SQL that works with most databases
datasource.setConnectionInitSqls(ImmutableList.of("SET sql_mode='ANSI_QUOTES'"));
this.dbi = new DBI(datasource);
log.info("Configured MySQL as metadata storage");
}
@Override
public String getPayloadType()
{
return PAYLOAD_TYPE;
}
@Override
public String getSerialType()
{
return SERIAL_TYPE;
}
@Override
public String getCollation()
{
return COLLATION;
}
@Override
public String getQuoteString()
{
return QUOTE_STRING;
}
@Override
public int getStreamingFetchSize()
{
// this is MySQL's way of indicating you want results streamed back
// see http://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
return Integer.MIN_VALUE;
}
@Override
public boolean tableExists(Handle handle, String tableName)
{
String databaseCharset = handle
.createQuery("SELECT @@character_set_database")
.map(StringMapper.FIRST)
.first();
if (!databaseCharset.startsWith("utf8")) {
throw new ISE(
"Druid requires its MySQL database to be created with an UTF8 charset, found `%1$s`. "
+ "The recommended charset is `utf8mb4`.",
databaseCharset
);
} else if (!"utf8mb4".equals(databaseCharset)) {
log.warn("The current database charset `%1$s` does not match the recommended charset `utf8mb4`", databaseCharset);
}
return !handle.createQuery("SHOW tables LIKE :tableName")
.bind("tableName", tableName)
.list()
.isEmpty();
}
@Override
protected boolean connectorIsTransientException(Throwable e)
{
return e instanceof MySQLTransientException
|| (e instanceof SQLException && ((SQLException) e).getErrorCode() == 1317 /* ER_QUERY_INTERRUPTED */);
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
)
{
return getDBI().withHandle(
handle -> {
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value) ON DUPLICATE KEY UPDATE %3$s = :value",
tableName,
keyColumn,
valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
return null;
}
);
}
@Override
public DBI getDBI()
{
return dbi;
}
}