| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.iceberg.jdbc; |
| |
| import java.sql.DataTruncation; |
| import java.sql.PreparedStatement; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.SQLIntegrityConstraintViolationException; |
| import java.sql.SQLNonTransientConnectionException; |
| import java.sql.SQLTimeoutException; |
| import java.sql.SQLTransientConnectionException; |
| import java.sql.SQLWarning; |
| import java.util.Map; |
| import java.util.Objects; |
| import org.apache.iceberg.BaseMetastoreTableOperations; |
| import org.apache.iceberg.TableMetadata; |
| import org.apache.iceberg.catalog.TableIdentifier; |
| import org.apache.iceberg.exceptions.AlreadyExistsException; |
| import org.apache.iceberg.exceptions.CommitFailedException; |
| import org.apache.iceberg.exceptions.NoSuchTableException; |
| import org.apache.iceberg.io.FileIO; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| class JdbcTableOperations extends BaseMetastoreTableOperations { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(JdbcTableOperations.class); |
| private final String catalogName; |
| private final TableIdentifier tableIdentifier; |
| private final FileIO fileIO; |
| private final JdbcClientPool connections; |
| |
| protected JdbcTableOperations( |
| JdbcClientPool dbConnPool, |
| FileIO fileIO, |
| String catalogName, |
| TableIdentifier tableIdentifier) { |
| this.catalogName = catalogName; |
| this.tableIdentifier = tableIdentifier; |
| this.fileIO = fileIO; |
| this.connections = dbConnPool; |
| } |
| |
| @Override |
| public void doRefresh() { |
| Map<String, String> table; |
| |
| try { |
| table = getTable(); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new UncheckedInterruptedException(e, "Interrupted during refresh"); |
| } catch (SQLException e) { |
| // SQL exception happened when getting table from catalog |
| throw new UncheckedSQLException( |
| e, "Failed to get table %s from catalog %s", tableIdentifier, catalogName); |
| } |
| |
| if (table.isEmpty()) { |
| if (currentMetadataLocation() != null) { |
| throw new NoSuchTableException( |
| "Failed to load table %s from catalog %s: dropped by another process", |
| tableIdentifier, catalogName); |
| } else { |
| this.disableRefresh(); |
| return; |
| } |
| } |
| |
| String newMetadataLocation = table.get(JdbcUtil.METADATA_LOCATION); |
| Preconditions.checkState( |
| newMetadataLocation != null, |
| "Invalid table %s: metadata location is null", |
| tableIdentifier); |
| refreshFromMetadataLocation(newMetadataLocation); |
| } |
| |
| @Override |
| public void doCommit(TableMetadata base, TableMetadata metadata) { |
| String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 1); |
| try { |
| Map<String, String> table = getTable(); |
| |
| if (base != null) { |
| validateMetadataLocation(table, base); |
| String oldMetadataLocation = base.metadataFileLocation(); |
| // Start atomic update |
| LOG.debug("Committing existing table: {}", tableName()); |
| updateTable(newMetadataLocation, oldMetadataLocation); |
| } else { |
| // table not exists create it |
| LOG.debug("Committing new table: {}", tableName()); |
| createTable(newMetadataLocation); |
| } |
| |
| } catch (SQLIntegrityConstraintViolationException e) { |
| |
| if (currentMetadataLocation() == null) { |
| throw new AlreadyExistsException(e, "Table already exists: %s", tableIdentifier); |
| } else { |
| throw new UncheckedSQLException(e, "Table already exists: %s", tableIdentifier); |
| } |
| |
| } catch (SQLTimeoutException e) { |
| throw new UncheckedSQLException(e, "Database Connection timeout"); |
| } catch (SQLTransientConnectionException | SQLNonTransientConnectionException e) { |
| throw new UncheckedSQLException(e, "Database Connection failed"); |
| } catch (DataTruncation e) { |
| throw new UncheckedSQLException(e, "Database data truncation error"); |
| } catch (SQLWarning e) { |
| throw new UncheckedSQLException(e, "Database warning"); |
| } catch (SQLException e) { |
| // SQLite doesn't set SQLState or throw SQLIntegrityConstraintViolationException |
| if (e.getMessage().contains("constraint failed")) { |
| throw new AlreadyExistsException("Table already exists: %s", tableIdentifier); |
| } |
| throw new UncheckedSQLException(e, "Unknown failure"); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new UncheckedInterruptedException(e, "Interrupted during commit"); |
| } |
| } |
| |
| private void updateTable(String newMetadataLocation, String oldMetadataLocation) |
| throws SQLException, InterruptedException { |
| int updatedRecords = |
| connections.run( |
| conn -> { |
| try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.DO_COMMIT_SQL)) { |
| // UPDATE |
| sql.setString(1, newMetadataLocation); |
| sql.setString(2, oldMetadataLocation); |
| // WHERE |
| sql.setString(3, catalogName); |
| sql.setString(4, JdbcUtil.namespaceToString(tableIdentifier.namespace())); |
| sql.setString(5, tableIdentifier.name()); |
| sql.setString(6, oldMetadataLocation); |
| return sql.executeUpdate(); |
| } |
| }); |
| |
| if (updatedRecords == 1) { |
| LOG.debug("Successfully committed to existing table: {}", tableIdentifier); |
| } else { |
| throw new CommitFailedException( |
| "Failed to update table %s from catalog %s", tableIdentifier, catalogName); |
| } |
| } |
| |
| private void createTable(String newMetadataLocation) throws SQLException, InterruptedException { |
| int insertRecord = |
| connections.run( |
| conn -> { |
| try (PreparedStatement sql = |
| conn.prepareStatement(JdbcUtil.DO_COMMIT_CREATE_TABLE_SQL)) { |
| sql.setString(1, catalogName); |
| sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace())); |
| sql.setString(3, tableIdentifier.name()); |
| sql.setString(4, newMetadataLocation); |
| return sql.executeUpdate(); |
| } |
| }); |
| |
| if (insertRecord == 1) { |
| LOG.debug("Successfully committed to new table: {}", tableIdentifier); |
| } else { |
| throw new CommitFailedException( |
| "Failed to create table %s in catalog %s", tableIdentifier, catalogName); |
| } |
| } |
| |
| private void validateMetadataLocation(Map<String, String> table, TableMetadata base) { |
| String catalogMetadataLocation = table.get(JdbcUtil.METADATA_LOCATION); |
| String baseMetadataLocation = base != null ? base.metadataFileLocation() : null; |
| |
| if (!Objects.equals(baseMetadataLocation, catalogMetadataLocation)) { |
| throw new CommitFailedException( |
| "Cannot commit %s: metadata location %s has changed from %s", |
| tableIdentifier, baseMetadataLocation, catalogMetadataLocation); |
| } |
| } |
| |
| @Override |
| public FileIO io() { |
| return fileIO; |
| } |
| |
| @Override |
| protected String tableName() { |
| return tableIdentifier.toString(); |
| } |
| |
| private Map<String, String> getTable() |
| throws UncheckedSQLException, SQLException, InterruptedException { |
| return connections.run( |
| conn -> { |
| Map<String, String> table = Maps.newHashMap(); |
| |
| try (PreparedStatement sql = conn.prepareStatement(JdbcUtil.GET_TABLE_SQL)) { |
| sql.setString(1, catalogName); |
| sql.setString(2, JdbcUtil.namespaceToString(tableIdentifier.namespace())); |
| sql.setString(3, tableIdentifier.name()); |
| ResultSet rs = sql.executeQuery(); |
| |
| if (rs.next()) { |
| table.put(JdbcUtil.CATALOG_NAME, rs.getString(JdbcUtil.CATALOG_NAME)); |
| table.put(JdbcUtil.TABLE_NAMESPACE, rs.getString(JdbcUtil.TABLE_NAMESPACE)); |
| table.put(JdbcUtil.TABLE_NAME, rs.getString(JdbcUtil.TABLE_NAME)); |
| table.put(JdbcUtil.METADATA_LOCATION, rs.getString(JdbcUtil.METADATA_LOCATION)); |
| table.put( |
| JdbcUtil.PREVIOUS_METADATA_LOCATION, |
| rs.getString(JdbcUtil.PREVIOUS_METADATA_LOCATION)); |
| } |
| |
| rs.close(); |
| } |
| |
| return table; |
| }); |
| } |
| } |