blob: 24409bc3e5ef8f5a4a962b25da64d0859c5f738d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.jdbc;
import java.io.Closeable;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseMetastoreCatalog;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.hadoop.Configurable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.LocationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JdbcCatalog extends BaseMetastoreCatalog
implements Configurable<Configuration>, SupportsNamespaces, Closeable {
public static final String PROPERTY_PREFIX = "jdbc.";
private static final String NAMESPACE_EXISTS_PROPERTY = "exists";
private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
private static final Joiner SLASH = Joiner.on("/");
private FileIO io;
private String catalogName = "jdbc";
private String warehouseLocation;
private Object conf;
private JdbcClientPool connections;
public JdbcCatalog() {}
@Override
public void initialize(String name, Map<String, String> properties) {
String uri = properties.get(CatalogProperties.URI);
Preconditions.checkNotNull(uri, "JDBC connection URI is required");
String inputWarehouseLocation = properties.get(CatalogProperties.WAREHOUSE_LOCATION);
Preconditions.checkArgument(
inputWarehouseLocation != null && inputWarehouseLocation.length() > 0,
"Cannot initialize JDBCCatalog because warehousePath must not be null or empty");
this.warehouseLocation = LocationUtil.stripTrailingSlash(inputWarehouseLocation);
if (name != null) {
this.catalogName = name;
}
String fileIOImpl =
properties.getOrDefault(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.hadoop.HadoopFileIO");
this.io = CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
try {
LOG.debug("Connecting to JDBC database {}", properties.get(CatalogProperties.URI));
connections = new JdbcClientPool(uri, properties);
initializeCatalogTables();
} catch (SQLTimeoutException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Query timed out");
} catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog: Connection failed");
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Cannot initialize JDBC catalog");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in call to initialize");
}
}
private void initializeCatalogTables() throws InterruptedException, SQLException {
LOG.trace("Creating database tables (if missing) to store iceberg catalog");
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(
null /* catalog name */,
null /* schemaPattern */,
JdbcUtil.CATALOG_TABLE_NAME /* tableNamePattern */,
null /* types */);
if (tableExists.next()) {
return true;
}
LOG.debug("Creating table {} to store iceberg catalog", JdbcUtil.CATALOG_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_CATALOG_TABLE).execute();
});
connections.run(
conn -> {
DatabaseMetaData dbMeta = conn.getMetaData();
ResultSet tableExists =
dbMeta.getTables(
null /* catalog name */,
null /* schemaPattern */,
JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME /* tableNamePattern */,
null /* types */);
if (tableExists.next()) {
return true;
}
LOG.debug(
"Creating table {} to store iceberg catalog namespace properties",
JdbcUtil.NAMESPACE_PROPERTIES_TABLE_NAME);
return conn.prepareStatement(JdbcUtil.CREATE_NAMESPACE_PROPERTIES_TABLE).execute();
});
}
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new JdbcTableOperations(connections, io, catalogName, tableIdentifier);
}
@Override
protected String defaultWarehouseLocation(TableIdentifier table) {
return SLASH.join(defaultNamespaceLocation(table.namespace()), table.name());
}
@Override
public boolean dropTable(TableIdentifier identifier, boolean purge) {
TableOperations ops = newTableOps(identifier);
TableMetadata lastMetadata = null;
if (purge) {
try {
lastMetadata = ops.current();
} catch (NotFoundException e) {
LOG.warn(
"Failed to load table metadata for table: {}, continuing drop without purge",
identifier,
e);
}
}
int deletedRecords =
execute(
JdbcUtil.DROP_TABLE_SQL,
catalogName,
JdbcUtil.namespaceToString(identifier.namespace()),
identifier.name());
if (deletedRecords == 0) {
LOG.info("Skipping drop, table does not exist: {}", identifier);
return false;
}
if (purge && lastMetadata != null) {
CatalogUtil.dropTableData(ops.io(), lastMetadata);
}
LOG.info("Dropped table: {}", identifier);
return true;
}
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
return fetch(
row ->
JdbcUtil.stringToTableIdentifier(
row.getString(JdbcUtil.TABLE_NAMESPACE), row.getString(JdbcUtil.TABLE_NAME)),
JdbcUtil.LIST_TABLES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
}
@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
int updatedRecords =
execute(
err -> {
// SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException
if (err instanceof SQLIntegrityConstraintViolationException
|| (err.getMessage() != null && err.getMessage().contains("constraint failed"))) {
throw new AlreadyExistsException("Table already exists: %s", to);
}
},
JdbcUtil.RENAME_TABLE_SQL,
JdbcUtil.namespaceToString(to.namespace()),
to.name(),
catalogName,
JdbcUtil.namespaceToString(from.namespace()),
from.name());
if (updatedRecords == 1) {
LOG.info("Renamed table from {}, to {}", from, to);
} else if (updatedRecords == 0) {
throw new NoSuchTableException("Table does not exist: %s", from);
} else {
LOG.warn(
"Rename operation affected {} rows: the catalog table's primary key assumption has been violated",
updatedRecords);
}
}
@Override
public String name() {
return catalogName;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
if (namespaceExists(namespace)) {
throw new AlreadyExistsException("Namespace already exists: %s", namespace);
}
Map<String, String> createMetadata;
if (metadata == null || metadata.isEmpty()) {
createMetadata = ImmutableMap.of(NAMESPACE_EXISTS_PROPERTY, "true");
} else {
createMetadata =
ImmutableMap.<String, String>builder()
.putAll(metadata)
.put(NAMESPACE_EXISTS_PROPERTY, "true")
.build();
}
insertProperties(namespace, createMetadata);
}
@Override
public List<Namespace> listNamespaces() {
List<Namespace> namespaces = Lists.newArrayList();
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
JdbcUtil.LIST_ALL_TABLE_NAMESPACES_SQL,
catalogName));
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)),
JdbcUtil.LIST_ALL_PROPERTY_NAMESPACES_SQL,
catalogName));
namespaces =
namespaces.stream()
// only get sub namespaces/children
.filter(n -> n.levels().length >= 1)
// only get sub namespaces/children
.map(n -> Namespace.of(Arrays.stream(n.levels()).limit(1).toArray(String[]::new)))
// remove duplicates
.distinct()
.collect(Collectors.toList());
return namespaces;
}
@Override
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
if (namespace.isEmpty()) {
return listNamespaces();
}
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
List<Namespace> namespaces = Lists.newArrayList();
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.TABLE_NAMESPACE)),
JdbcUtil.LIST_NAMESPACES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace) + "%"));
namespaces.addAll(
fetch(
row -> JdbcUtil.stringToNamespace(row.getString(JdbcUtil.NAMESPACE_NAME)),
JdbcUtil.LIST_PROPERTY_NAMESPACES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace) + "%"));
int subNamespaceLevelLength = namespace.levels().length + 1;
namespaces =
namespaces.stream()
// exclude itself
.filter(n -> !n.equals(namespace))
// only get sub namespaces/children
.filter(n -> n.levels().length >= subNamespaceLevelLength)
// only get sub namespaces/children
.map(
n ->
Namespace.of(
Arrays.stream(n.levels())
.limit(subNamespaceLevelLength)
.toArray(String[]::new)))
// remove duplicates
.distinct()
.collect(Collectors.toList());
return namespaces;
}
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace)
throws NoSuchNamespaceException {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
Map<String, String> properties = Maps.newHashMap();
properties.putAll(fetchProperties(namespace));
properties.put("location", defaultNamespaceLocation(namespace));
properties.remove(NAMESPACE_EXISTS_PROPERTY); // do not return reserved existence property
return ImmutableMap.copyOf(properties);
}
private String defaultNamespaceLocation(Namespace namespace) {
if (namespace.isEmpty()) {
return warehouseLocation;
} else {
return SLASH.join(warehouseLocation, SLASH.join(namespace.levels()));
}
}
@Override
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
if (!namespaceExists(namespace)) {
return false;
}
List<TableIdentifier> tableIdentifiers = listTables(namespace);
if (tableIdentifiers != null && !tableIdentifiers.isEmpty()) {
throw new NamespaceNotEmptyException(
"Namespace %s is not empty. %s tables exist.", namespace, tableIdentifiers.size());
}
int deletedRows =
execute(
JdbcUtil.DELETE_ALL_NAMESPACE_PROPERTIES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace));
return deletedRows > 0;
}
@Override
public boolean setProperties(Namespace namespace, Map<String, String> properties)
throws NoSuchNamespaceException {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
Preconditions.checkNotNull(properties, "Invalid properties to set: null");
if (properties.isEmpty()) {
return false;
}
Preconditions.checkArgument(
!properties.containsKey(NAMESPACE_EXISTS_PROPERTY),
"Cannot set reserved property: %s",
NAMESPACE_EXISTS_PROPERTY);
Map<String, String> startingProperties = fetchProperties(namespace);
Map<String, String> inserts = Maps.newHashMap();
Map<String, String> updates = Maps.newHashMap();
for (String key : properties.keySet()) {
String value = properties.get(key);
if (startingProperties.containsKey(key)) {
updates.put(key, value);
} else {
inserts.put(key, value);
}
}
boolean hadInserts = false;
if (!inserts.isEmpty()) {
hadInserts = insertProperties(namespace, inserts);
}
boolean hadUpdates = false;
if (!updates.isEmpty()) {
hadUpdates = updateProperties(namespace, updates);
}
return hadInserts || hadUpdates;
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties)
throws NoSuchNamespaceException {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
Preconditions.checkNotNull(properties, "Invalid properties to remove: null");
if (properties.isEmpty()) {
return false;
}
return deleteProperties(namespace, properties);
}
@Override
public void close() {
connections.close();
}
@Override
public boolean namespaceExists(Namespace namespace) {
if (exists(
JdbcUtil.GET_NAMESPACE_SQL, catalogName, JdbcUtil.namespaceToString(namespace) + "%")) {
return true;
}
if (exists(
JdbcUtil.GET_NAMESPACE_PROPERTIES_SQL,
catalogName,
JdbcUtil.namespaceToString(namespace) + "%")) {
return true;
}
return false;
}
private int execute(String sql, String... args) {
return execute(err -> {}, sql, args);
}
private int execute(Consumer<SQLException> sqlErrorHandler, String sql, String... args) {
try {
return connections.run(
conn -> {
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
for (int pos = 0; pos < args.length; pos += 1) {
preparedStatement.setString(pos + 1, args[pos]);
}
return preparedStatement.executeUpdate();
}
});
} catch (SQLException e) {
sqlErrorHandler.accept(e);
throw new UncheckedSQLException(e, "Failed to execute: %s", sql);
} catch (InterruptedException e) {
throw new UncheckedInterruptedException(e, "Interrupted in SQL command");
}
}
@SuppressWarnings("checkstyle:NestedTryDepth")
private boolean exists(String sql, String... args) {
try {
return connections.run(
conn -> {
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
for (int pos = 0; pos < args.length; pos += 1) {
preparedStatement.setString(pos + 1, args[pos]);
}
try (ResultSet rs = preparedStatement.executeQuery()) {
if (rs.next()) {
return true;
}
}
}
return false;
});
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Failed to execute exists query: %s", sql);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in SQL query");
}
}
@FunctionalInterface
interface RowProducer<R> {
R apply(ResultSet result) throws SQLException;
}
@SuppressWarnings("checkstyle:NestedTryDepth")
private <R> List<R> fetch(RowProducer<R> toRow, String sql, String... args) {
try {
return connections.run(
conn -> {
List<R> result = Lists.newArrayList();
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
for (int pos = 0; pos < args.length; pos += 1) {
preparedStatement.setString(pos + 1, args[pos]);
}
try (ResultSet rs = preparedStatement.executeQuery()) {
while (rs.next()) {
result.add(toRow.apply(rs));
}
}
}
return result;
});
} catch (SQLException e) {
throw new UncheckedSQLException(e, "Failed to execute query: %s", sql);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e, "Interrupted in SQL query");
}
}
private Map<String, String> fetchProperties(Namespace namespace) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
String namespaceName = JdbcUtil.namespaceToString(namespace);
List<Map.Entry<String, String>> entries =
fetch(
row ->
new AbstractMap.SimpleImmutableEntry<>(
row.getString(JdbcUtil.NAMESPACE_PROPERTY_KEY),
row.getString(JdbcUtil.NAMESPACE_PROPERTY_VALUE)),
JdbcUtil.GET_ALL_NAMESPACE_PROPERTIES_SQL,
catalogName,
namespaceName);
return ImmutableMap.<String, String>builder().putAll(entries).build();
}
private boolean insertProperties(Namespace namespace, Map<String, String> properties) {
String namespaceName = JdbcUtil.namespaceToString(namespace);
String[] args =
properties.entrySet().stream()
.flatMap(
entry -> Stream.of(catalogName, namespaceName, entry.getKey(), entry.getValue()))
.toArray(String[]::new);
int insertedRecords = execute(JdbcUtil.insertPropertiesStatement(properties.size()), args);
if (insertedRecords == properties.size()) {
return true;
}
throw new IllegalStateException(
String.format("Failed to insert: %d of %d succeeded", insertedRecords, properties.size()));
}
private boolean updateProperties(Namespace namespace, Map<String, String> properties) {
String namespaceName = JdbcUtil.namespaceToString(namespace);
Stream<String> caseArgs =
properties.entrySet().stream()
.flatMap(entry -> Stream.of(entry.getKey(), entry.getValue()));
Stream<String> whereArgs =
Stream.concat(Stream.of(catalogName, namespaceName), properties.keySet().stream());
String[] args = Stream.concat(caseArgs, whereArgs).toArray(String[]::new);
int updatedRecords = execute(JdbcUtil.updatePropertiesStatement(properties.size()), args);
if (updatedRecords == properties.size()) {
return true;
}
throw new IllegalStateException(
String.format("Failed to update: %d of %d succeeded", updatedRecords, properties.size()));
}
private boolean deleteProperties(Namespace namespace, Set<String> properties) {
String namespaceName = JdbcUtil.namespaceToString(namespace);
String[] args =
Stream.concat(Stream.of(catalogName, namespaceName), properties.stream())
.toArray(String[]::new);
return execute(JdbcUtil.deletePropertiesStatement(properties), args) > 0;
}
}