blob: 99d69f5e14a352632e6e5fc9e5caea04d849e7f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.metadata;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.skife.jdbi.v2.Batch;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.TransactionCallback;
import org.skife.jdbi.v2.TransactionIsolationLevel;
import org.skife.jdbi.v2.TransactionStatus;
import org.skife.jdbi.v2.exceptions.DBIException;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
import org.skife.jdbi.v2.exceptions.UnableToObtainConnectionException;
import org.skife.jdbi.v2.tweak.HandleCallback;
import org.skife.jdbi.v2.util.ByteArrayMapper;
import org.skife.jdbi.v2.util.IntegerMapper;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public abstract class SQLMetadataConnector implements MetadataStorageConnector
{
private static final Logger log = new Logger(SQLMetadataConnector.class);
private static final String PAYLOAD_TYPE = "BLOB";
private static final String COLLATION = "";
static final int DEFAULT_MAX_TRIES = 10;
private final Supplier<MetadataStorageConnectorConfig> config;
private final Supplier<MetadataStorageTablesConfig> tablesConfigSupplier;
private final Predicate<Throwable> shouldRetry;
public SQLMetadataConnector(
Supplier<MetadataStorageConnectorConfig> config,
Supplier<MetadataStorageTablesConfig> tablesConfigSupplier
)
{
this.config = config;
this.tablesConfigSupplier = tablesConfigSupplier;
this.shouldRetry = this::isTransientException;
}
/**
* SQL type to use for payload data (e.g. JSON blobs).
* Must be a binary type, which values can be accessed using ResultSet.getBytes()
* <p/>
* The resulting string will be interpolated into the table creation statement, e.g.
* <code>CREATE TABLE druid_table ( payload <type> NOT NULL, ... )</code>
*
* @return String representing the SQL type
*/
public String getPayloadType()
{
return PAYLOAD_TYPE;
}
/**
* The character set and collation for case-sensitive nonbinary string comparison
*
* @return the collation for the character set
*/
public String getCollation()
{
return COLLATION;
}
/**
* Auto-incrementing integer SQL type to use for IDs.
* The returned string is interpolated into the table creation statement as follows:
* <pre>
* CREATE TABLE druid_table (
* id &lt;serial-type&gt; NOT NULL,
* col_2 VARCHAR(255) NOT NULL,
* col_3 VARCHAR(255) NOT NULL
* ...
* )
* </pre>
*
* @return String representing auto-incrementing SQL integer type to use for IDs.
*/
public abstract String getSerialType();
/**
* Returns the value that should be passed to statement.setFetchSize to ensure results
* are streamed back from the database instead of fetching the entire result set in memory.
*
* @return optimal fetch size to stream results back
*/
public abstract int getStreamingFetchSize();
/**
* @return the string that should be used to quote string fields
*/
public abstract String getQuoteString();
public String getValidationQuery()
{
return "SELECT 1";
}
public abstract boolean tableExists(Handle handle, String tableName);
public abstract String limitClause(int limit);
public <T> T retryWithHandle(
final HandleCallback<T> callback,
final Predicate<Throwable> myShouldRetry
)
{
try {
return RetryUtils.retry(() -> getDBI().withHandle(callback), myShouldRetry, DEFAULT_MAX_TRIES);
}
catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
}
public <T> T retryWithHandle(final HandleCallback<T> callback)
{
return retryWithHandle(callback, shouldRetry);
}
public <T> T retryTransaction(final TransactionCallback<T> callback, final int quietTries, final int maxTries)
{
try {
return RetryUtils.retry(() -> getDBI().inTransaction(TransactionIsolationLevel.READ_COMMITTED, callback), shouldRetry, quietTries, maxTries);
}
catch (Exception e) {
Throwables.propagateIfPossible(e);
throw new RuntimeException(e);
}
}
public final boolean isTransientException(Throwable e)
{
return e != null && (e instanceof RetryTransactionException
|| e instanceof SQLTransientException
|| e instanceof SQLRecoverableException
|| e instanceof UnableToObtainConnectionException
|| (e instanceof UnableToExecuteStatementException && isTransientException(e.getCause()))
|| connectorIsTransientException(e)
|| (e instanceof SQLException && isTransientException(e.getCause()))
|| (e instanceof DBIException && isTransientException(e.getCause())));
}
/**
* Vendor specific errors that are not covered by {@link #isTransientException(Throwable)}
*/
protected boolean connectorIsTransientException(Throwable e)
{
return false;
}
/**
* Checks if the root cause of the given exception is a PacketTooBigException.
*
* @return false by default. Specific implementations should override this method
* to correctly classify their packet exceptions.
*/
protected boolean isRootCausePacketTooBigException(Throwable t)
{
return false;
}
/**
* Creates the given table and indexes if the table doesn't already exist.
*/
public void createTable(final String tableName, final Iterable<String> sql)
{
try {
retryWithHandle(handle -> {
if (tableExists(handle, tableName)) {
log.info("Table[%s] already exists", tableName);
} else {
log.info("Creating table[%s]", tableName);
final Batch batch = handle.createBatch();
for (String s : sql) {
batch.add(s);
}
batch.execute();
}
return null;
});
}
catch (Exception e) {
log.warn(e, "Exception creating table");
}
}
/**
* Execute the desired ALTER statement on the desired table
*
* @param tableName The name of the table being altered
* @param sql ALTER statment to be executed
*/
private void alterTable(final String tableName, final Iterable<String> sql)
{
try {
retryWithHandle(handle -> {
if (tableExists(handle, tableName)) {
final Batch batch = handle.createBatch();
for (String s : sql) {
log.info("Altering table[%s], with command: %s", tableName, s);
batch.add(s);
}
batch.execute();
} else {
log.info("Table[%s] doesn't exist.", tableName);
}
return null;
});
}
catch (Exception e) {
log.warn(e, "Exception Altering table[%s]", tableName);
}
}
public void createPendingSegmentsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) %4$s NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " start VARCHAR(255) NOT NULL,\n"
+ " %3$send%3$s VARCHAR(255) NOT NULL,\n"
+ " sequence_name VARCHAR(255) NOT NULL,\n"
+ " sequence_prev_id VARCHAR(255) NOT NULL,\n"
+ " sequence_name_prev_id_sha1 VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id),\n"
+ " UNIQUE (sequence_name_prev_id_sha1)\n"
+ ")",
tableName, getPayloadType(), getQuoteString(), getCollation()
),
StringUtils.format(
"CREATE INDEX idx_%1$s_datasource_end ON %1$s(dataSource, %2$send%2$s)",
tableName,
getQuoteString()
),
StringUtils.format(
"CREATE INDEX idx_%1$s_datasource_sequence ON %1$s(dataSource, sequence_name)",
tableName
)
)
);
alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
}
public void createDataSourceTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " dataSource VARCHAR(255) %3$s NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " commit_metadata_payload %2$s NOT NULL,\n"
+ " commit_metadata_sha1 VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (dataSource)\n"
+ ")",
tableName, getPayloadType(), getCollation()
)
)
);
}
public void createSegmentTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) %4$s NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " start VARCHAR(255) NOT NULL,\n"
+ " %3$send%3$s VARCHAR(255) NOT NULL,\n"
+ " partitioned BOOLEAN NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " used_status_last_updated VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getPayloadType(), getQuoteString(), getCollation()
),
StringUtils.format("CREATE INDEX idx_%1$s_used ON %1$s(used)", tableName),
StringUtils.format(
"CREATE INDEX idx_%1$s_datasource_used_end_start ON %1$s(dataSource, used, %2$send%2$s, start)",
tableName,
getQuoteString()
)
)
);
}
private void createUpgradeSegmentsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " task_id VARCHAR(255) NOT NULL,\n"
+ " segment_id VARCHAR(255) NOT NULL,\n"
+ " lock_version VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType()
),
StringUtils.format(
"CREATE INDEX idx_%1$s_task ON %1$s(task_id)",
tableName
)
)
);
}
public void createRulesTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " dataSource VARCHAR(255) %3$s NOT NULL,\n"
+ " version VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getPayloadType(), getCollation()
),
StringUtils.format("CREATE INDEX idx_%1$s_datasource ON %1$s(dataSource)", tableName)
)
);
}
public void createConfigTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " name VARCHAR(255) NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " PRIMARY KEY(name)\n"
+ ")",
tableName, getPayloadType()
)
)
);
}
public void prepareTaskEntryTable(final String tableName)
{
createEntryTable(tableName);
alterEntryTableAddTypeAndGroupId(tableName);
}
public void createEntryTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " datasource VARCHAR(255) %3$s NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " status_payload %2$s NOT NULL,\n"
+ " active BOOLEAN NOT NULL DEFAULT FALSE,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getPayloadType(), getCollation()
)
)
);
final Set<String> createdIndexSet = getIndexOnTable(tableName);
createIndex(
tableName,
StringUtils.format("idx_%1$s_active_created_date", tableName),
ImmutableList.of("active", "created_date"),
createdIndexSet
);
createIndex(
tableName,
StringUtils.format("idx_%1$s_datasource_active", tableName),
ImmutableList.of("datasource", "active"),
createdIndexSet
);
}
private void alterEntryTableAddTypeAndGroupId(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "type")) {
log.info("Table[%s] already has column[type].", tableName);
} else {
log.info("Adding column[type] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN type VARCHAR(255)", tableName));
}
if (tableHasColumn(tableName, "group_id")) {
log.info("Table[%s] already has column[group_id].", tableName);
} else {
log.info("Adding column[group_id] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN group_id VARCHAR(255)", tableName));
}
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}
}
private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
log.info("Table[%s] already has column[upgraded_from_segment_id].", tableName);
} else {
log.info("Adding column[upgraded_from_segment_id] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN upgraded_from_segment_id VARCHAR(255)", tableName));
}
if (tableHasColumn(tableName, "task_allocator_id")) {
log.info("Table[%s] already has column[task_allocator_id].", tableName);
} else {
log.info("Adding column[task_allocator_id] to table[%s].", tableName);
statements.add(StringUtils.format("ALTER TABLE %1$s ADD COLUMN task_allocator_id VARCHAR(255)", tableName));
}
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}
}
public void createLogTable(final String tableName, final String entryTypeName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " %4$s_id VARCHAR(255) DEFAULT NULL,\n"
+ " log_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType(), getPayloadType(), entryTypeName
),
StringUtils.format("CREATE INDEX idx_%1$s_%2$s_id ON %1$s(%2$s_id)", tableName, entryTypeName)
)
);
}
public void createLockTable(final String tableName, final String entryTypeName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " %4$s_id VARCHAR(255) DEFAULT NULL,\n"
+ " lock_payload %3$s,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType(), getPayloadType(), entryTypeName
),
StringUtils.format("CREATE INDEX idx_%1$s_%2$s_id ON %1$s(%2$s_id)", tableName, entryTypeName)
)
);
}
public void createSupervisorsTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " spec_id VARCHAR(255) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " payload %3$s NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getSerialType(), getPayloadType()
),
StringUtils.format("CREATE INDEX idx_%1$s_spec_id ON %1$s(spec_id)", tableName)
)
);
}
/**
* Adds the used_status_last_updated column to the "segments" table.
*/
protected void alterSegmentTableAddUsedFlagLastUpdated()
{
final String tableName = tablesConfigSupplier.get().getSegmentsTable();
if (tableHasColumn(tableName, "used_status_last_updated")) {
log.info("Table[%s] already has column[used_status_last_updated].", tableName);
} else {
log.info("Adding column[used_status_last_updated] to table[%s].", tableName);
alterTable(
tableName,
ImmutableList.of(
StringUtils.format(
"ALTER TABLE %1$s ADD used_status_last_updated varchar(255)",
tableName
)
)
);
}
}
@Override
public Void insertOrUpdate(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key,
final byte[] value
)
{
return getDBI().inTransaction(
new TransactionCallback<Void>()
{
@Override
public Void inTransaction(Handle handle, TransactionStatus transactionStatus)
{
int count = handle
.createQuery(
StringUtils.format("SELECT COUNT(*) FROM %1$s WHERE %2$s = :key", tableName, keyColumn)
)
.bind("key", key)
.map(IntegerMapper.FIRST)
.first();
if (count == 0) {
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
tableName, keyColumn, valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
} else {
handle.createStatement(
StringUtils.format(
"UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
tableName, keyColumn, valueColumn
)
)
.bind("key", key)
.bind("value", value)
.execute();
}
return null;
}
}
);
}
@Override
public boolean compareAndSwap(
List<MetadataCASUpdate> updates
)
{
return getDBI().inTransaction(
TransactionIsolationLevel.REPEATABLE_READ,
new TransactionCallback<Boolean>()
{
@Override
public Boolean inTransaction(Handle handle, TransactionStatus transactionStatus)
{
List<byte[]> currentValues = new ArrayList<byte[]>();
// Compare
for (MetadataCASUpdate update : updates) {
byte[] currentValue = handle
.createQuery(
StringUtils.format(
"SELECT %1$s FROM %2$s WHERE %3$s = :key FOR UPDATE",
update.getValueColumn(),
update.getTableName(),
update.getKeyColumn()
)
)
.bind("key", update.getKey())
.map(ByteArrayMapper.FIRST)
.first();
if (!Arrays.equals(currentValue, update.getOldValue())) {
return false;
}
currentValues.add(currentValue);
}
// Swap
for (int i = 0; i < updates.size(); i++) {
MetadataCASUpdate update = updates.get(i);
byte[] currentValue = currentValues.get(i);
if (currentValue == null) {
handle.createStatement(
StringUtils.format(
"INSERT INTO %1$s (%2$s, %3$s) VALUES (:key, :value)",
update.getTableName(),
update.getKeyColumn(),
update.getValueColumn()
)
)
.bind("key", update.getKey())
.bind("value", update.getNewValue())
.execute();
} else {
handle.createStatement(
StringUtils.format(
"UPDATE %1$s SET %3$s=:value WHERE %2$s=:key",
update.getTableName(),
update.getKeyColumn(),
update.getValueColumn()
)
)
.bind("key", update.getKey())
.bind("value", update.getNewValue())
.execute();
}
}
return true;
}
}
);
}
public abstract DBI getDBI();
@Override
public void createDataSourceTable()
{
if (config.get().isCreateTables()) {
createDataSourceTable(tablesConfigSupplier.get().getDataSourceTable());
}
}
@Override
public void createPendingSegmentsTable()
{
if (config.get().isCreateTables()) {
createPendingSegmentsTable(tablesConfigSupplier.get().getPendingSegmentsTable());
}
}
@Override
public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(tablesConfigSupplier.get().getSegmentsTable());
alterSegmentTableAddUsedFlagLastUpdated();
}
// Called outside of the above conditional because we want to validate the table
// regardless of cluster configuration for creating tables.
validateSegmentsTable();
}
@Override
public void createUpgradeSegmentsTable()
{
if (config.get().isCreateTables()) {
createUpgradeSegmentsTable(tablesConfigSupplier.get().getUpgradeSegmentsTable());
}
}
@Override
public void createRulesTable()
{
if (config.get().isCreateTables()) {
createRulesTable(tablesConfigSupplier.get().getRulesTable());
}
}
@Override
public void createConfigTable()
{
if (config.get().isCreateTables()) {
createConfigTable(tablesConfigSupplier.get().getConfigTable());
}
}
@Override
public void createTaskTables()
{
if (config.get().isCreateTables()) {
final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get();
final String entryType = tablesConfig.getTaskEntryType();
prepareTaskEntryTable(tablesConfig.getEntryTable(entryType));
createLogTable(tablesConfig.getLogTable(entryType), entryType);
createLockTable(tablesConfig.getLockTable(entryType), entryType);
}
}
@Override
public void createSupervisorsTable()
{
if (config.get().isCreateTables()) {
createSupervisorsTable(tablesConfigSupplier.get().getSupervisorTable());
}
}
@Override
public @Nullable byte[] lookup(
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
return getDBI().withHandle(
handle -> lookupWithHandle(handle, tableName, keyColumn, valueColumn, key)
);
}
public @Nullable byte[] lookupWithHandle(
final Handle handle,
final String tableName,
final String keyColumn,
final String valueColumn,
final String key
)
{
final String selectStatement = StringUtils.format(
"SELECT %s FROM %s WHERE %s = :key", valueColumn,
tableName, keyColumn
);
List<byte[]> matched = handle.createQuery(selectStatement)
.bind("key", key)
.map(ByteArrayMapper.FIRST)
.list();
if (matched.isEmpty()) {
return null;
}
if (matched.size() > 1) {
throw new ISE("Error! More than one matching entry[%d] found for [%s]?!", matched.size(), key);
}
return matched.get(0);
}
public MetadataStorageConnectorConfig getConfig()
{
return config.get();
}
protected static BasicDataSource makeDatasource(MetadataStorageConnectorConfig connectorConfig, String validationQuery)
{
BasicDataSource dataSource;
try {
Properties dbcpProperties = connectorConfig.getDbcpProperties();
if (dbcpProperties != null) {
dataSource = BasicDataSourceFactory.createDataSource(dbcpProperties);
} else {
dataSource = new BasicDataSourceExt(connectorConfig);
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
dataSource.setUsername(connectorConfig.getUser());
dataSource.setPassword(connectorConfig.getPassword());
String uri = connectorConfig.getConnectURI();
dataSource.setUrl(uri);
dataSource.setValidationQuery(validationQuery);
dataSource.setTestOnBorrow(true);
return dataSource;
}
protected BasicDataSource getDatasource()
{
return makeDatasource(getConfig(), getValidationQuery());
}
protected final <T> T inReadOnlyTransaction(
final TransactionCallback<T> callback
)
{
return getDBI().withHandle(
new HandleCallback<T>()
{
@Override
public T withHandle(Handle handle) throws Exception
{
final Connection connection = handle.getConnection();
final boolean readOnly = connection.isReadOnly();
connection.setReadOnly(true);
try {
return handle.inTransaction(callback);
}
finally {
try {
connection.setReadOnly(readOnly);
}
catch (SQLException e) {
// at least try to log it so we don't swallow exceptions
log.error(e, "Unable to reset connection read-only state");
}
}
}
}
);
}
private void createAuditTable(final String tableName)
{
createTable(
tableName,
ImmutableList.of(
StringUtils.format(
"CREATE TABLE %1$s (\n"
+ " id %2$s NOT NULL,\n"
+ " audit_key VARCHAR(255) NOT NULL,\n"
+ " type VARCHAR(255) NOT NULL,\n"
+ " author VARCHAR(255) NOT NULL,\n"
+ " comment VARCHAR(2048) NOT NULL,\n"
+ " created_date VARCHAR(255) NOT NULL,\n"
+ " payload %3$s NOT NULL,\n"
+ " PRIMARY KEY(id)\n"
+ ")",
tableName, getSerialType(), getPayloadType()
),
StringUtils.format("CREATE INDEX idx_%1$s_key_time ON %1$s(audit_key, created_date)", tableName),
StringUtils.format("CREATE INDEX idx_%1$s_type_time ON %1$s(type, created_date)", tableName),
StringUtils.format("CREATE INDEX idx_%1$s_audit_time ON %1$s(created_date)", tableName)
)
);
}
@Override
public void createAuditTable()
{
if (config.get().isCreateTables()) {
createAuditTable(tablesConfigSupplier.get().getAuditTable());
}
}
@Override
public void deleteAllRecords(final String tableName)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
if (tableExists(handle, tableName)) {
log.info("Deleting all records from table[%s]", tableName);
final Batch batch = handle.createBatch();
batch.add("DELETE FROM " + tableName);
batch.execute();
} else {
log.info("Table[%s] does not exit.", tableName);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception while deleting records from table");
}
}
/**
* Get the Set of the index on given table
*
* @param tableName name of the table to fetch the index map
* @return Set of the uppercase index names, returns empty set if table does not exist
*/
public Set<String> getIndexOnTable(String tableName)
{
Set<String> res = new HashSet<>();
try {
retryWithHandle(new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle) throws Exception
{
DatabaseMetaData databaseMetaData = handle.getConnection().getMetaData();
// Fetch the index for given table
ResultSet resultSet = getIndexInfo(databaseMetaData, tableName);
while (resultSet.next()) {
String indexName = resultSet.getString("INDEX_NAME");
if (org.apache.commons.lang.StringUtils.isNotBlank(indexName)) {
res.add(StringUtils.toUpperCase(indexName));
}
}
return null;
}
});
}
catch (Exception e) {
log.error(e, "Exception while listing the index on table %s ", tableName);
}
return ImmutableSet.copyOf(res);
}
/**
* Get the ResultSet for indexInfo for given table
*
* @param databaseMetaData DatabaseMetaData
* @param tableName Name of table
* @return ResultSet with index info
*/
public ResultSet getIndexInfo(DatabaseMetaData databaseMetaData, String tableName) throws SQLException
{
return databaseMetaData.getIndexInfo(
null,
null,
tableName, // tableName is case-sensitive in mysql default setting
false,
false
);
}
/**
* create index on the table with retry if not already exist, to be called after createTable
*
* @param tableName Name of the table to create index on
* @param indexName case-insensitive string index name, it helps to check the existing index on table
* @param indexCols List of columns to be indexed on
* @param createdIndexSet
*/
public void createIndex(
final String tableName,
final String indexName,
final List<String> indexCols,
final Set<String> createdIndexSet
)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
if (!createdIndexSet.contains(StringUtils.toUpperCase(indexName))) {
String indexSQL = StringUtils.format(
"CREATE INDEX %1$s ON %2$s(%3$s)",
indexName,
tableName,
Joiner.on(",").join(indexCols)
);
log.info("Creating Index on Table [%s], sql: [%s] ", tableName, indexSQL);
handle.execute(indexSQL);
} else {
log.info("Index [%s] on Table [%s] already exists", indexName, tableName);
}
return null;
}
}
);
}
catch (Exception e) {
log.error(e, StringUtils.format("Exception while creating index on table [%s]", tableName));
}
}
/**
* Checks table metadata to determine if the given column exists in the table.
*
* @return true if the column exists in the table, false otherwise
*/
protected boolean tableHasColumn(String tableName, String columnName)
{
return getDBI().withHandle(handle -> {
try {
if (tableExists(handle, tableName)) {
DatabaseMetaData dbMetaData = handle.getConnection().getMetaData();
ResultSet columns = dbMetaData.getColumns(null, null, tableName, columnName);
return columns.next();
} else {
return false;
}
}
catch (SQLException e) {
return false;
}
});
}
/**
* Ensures that the "segments" table has a schema compatible with the current version of Druid.
*
* @throws RuntimeException if the "segments" table has an incompatible schema.
* There is no recovering from an invalid schema, the program should crash.
* @see <a href="https://druid.apache.org/docs/latest/operations/metadata-migration/">Metadata migration</a> for info
* on manually preparing the "segments" table.
*/
private void validateSegmentsTable()
{
if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(), "used_status_last_updated")) {
// do nothing
} else {
throw new ISE(
"Cannot start Druid as table[%s] has an incompatible schema."
+ " Reason: Column [used_status_last_updated] does not exist in table."
+ " See https://druid.apache.org/docs/latest/operations/upgrade-prep.html for more info on remediation.",
tablesConfigSupplier.get().getSegmentsTable()
);
}
}
}