| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.dla; |
| |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hudi.common.fs.FSUtils; |
| import org.apache.hudi.common.util.Option; |
| import org.apache.hudi.common.util.StringUtils; |
| import org.apache.hudi.common.util.ValidationUtils; |
| import org.apache.hudi.exception.HoodieException; |
| import org.apache.hudi.hive.HiveSyncConfig; |
| import org.apache.hudi.hive.HoodieHiveSyncException; |
| import org.apache.hudi.hive.PartitionValueExtractor; |
| import org.apache.hudi.hive.SchemaDifference; |
| import org.apache.hudi.hive.util.HiveSchemaUtil; |
| import org.apache.hudi.sync.common.AbstractSyncHoodieClient; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.parquet.schema.MessageType; |
| |
| import java.io.IOException; |
| import java.sql.Connection; |
| import java.sql.DriverManager; |
| import java.sql.DatabaseMetaData; |
| import java.sql.ResultSet; |
| import java.sql.SQLException; |
| import java.sql.Statement; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.stream.Collectors; |
| |
| public class HoodieDLAClient extends AbstractSyncHoodieClient { |
| private static final Logger LOG = LogManager.getLogger(HoodieDLAClient.class); |
| private static final String HOODIE_LAST_COMMIT_TIME_SYNC = "hoodie_last_sync"; |
| // Make sure we have the dla JDBC driver in classpath |
| private static final String DRIVER_NAME = "com.mysql.jdbc.Driver"; |
| private static final String DLA_ESCAPE_CHARACTER = ""; |
| private static final String TBL_PROPERTIES_STR = "TBLPROPERTIES"; |
| |
| static { |
| try { |
| Class.forName(DRIVER_NAME); |
| } catch (ClassNotFoundException e) { |
| throw new IllegalStateException("Could not find " + DRIVER_NAME + " in classpath. ", e); |
| } |
| } |
| |
| private Connection connection; |
| private DLASyncConfig dlaConfig; |
| private PartitionValueExtractor partitionValueExtractor; |
| |
| public HoodieDLAClient(DLASyncConfig syncConfig, FileSystem fs) { |
| super(syncConfig.basePath, syncConfig.assumeDatePartitioning, syncConfig.useFileListingFromMetadata, |
| syncConfig.verifyMetadataFileListing, fs); |
| this.dlaConfig = syncConfig; |
| try { |
| this.partitionValueExtractor = |
| (PartitionValueExtractor) Class.forName(dlaConfig.partitionValueExtractorClass).newInstance(); |
| } catch (Exception e) { |
| throw new HoodieException( |
| "Failed to initialize PartitionValueExtractor class " + dlaConfig.partitionValueExtractorClass, e); |
| } |
| createDLAConnection(); |
| } |
| |
| private void createDLAConnection() { |
| if (connection == null) { |
| try { |
| Class.forName(DRIVER_NAME); |
| } catch (ClassNotFoundException e) { |
| LOG.error("Unable to load DLA driver class", e); |
| return; |
| } |
| try { |
| this.connection = DriverManager.getConnection(dlaConfig.jdbcUrl, dlaConfig.dlaUser, dlaConfig.dlaPass); |
| LOG.info("Successfully established DLA connection to " + dlaConfig.jdbcUrl); |
| } catch (SQLException e) { |
| throw new HoodieException("Cannot create dla connection ", e); |
| } |
| } |
| } |
| |
| @Override |
| public void createTable(String tableName, MessageType storageSchema, String inputFormatClass, |
| String outputFormatClass, String serdeClass, |
| Map<String, String> serdeProperties, Map<String, String> tableProperties) { |
| try { |
| String createSQLQuery = HiveSchemaUtil.generateCreateDDL(tableName, storageSchema, toHiveSyncConfig(), |
| inputFormatClass, outputFormatClass, serdeClass, serdeProperties, tableProperties); |
| LOG.info("Creating table with " + createSQLQuery); |
| updateDLASQL(createSQLQuery); |
| } catch (IOException e) { |
| throw new HoodieException("Failed to create table " + tableName, e); |
| } |
| } |
| |
| public Map<String, String> getTableSchema(String tableName) { |
| if (!doesTableExist(tableName)) { |
| throw new IllegalArgumentException( |
| "Failed to get schema for table " + tableName + " does not exist"); |
| } |
| Map<String, String> schema = new HashMap<>(); |
| ResultSet result = null; |
| try { |
| DatabaseMetaData databaseMetaData = connection.getMetaData(); |
| result = databaseMetaData.getColumns(dlaConfig.databaseName, dlaConfig.databaseName, tableName, null); |
| while (result.next()) { |
| String columnName = result.getString(4); |
| String columnType = result.getString(6); |
| if ("DECIMAL".equals(columnType)) { |
| int columnSize = result.getInt("COLUMN_SIZE"); |
| int decimalDigits = result.getInt("DECIMAL_DIGITS"); |
| columnType += String.format("(%s,%s)", columnSize, decimalDigits); |
| } |
| schema.put(columnName, columnType); |
| } |
| return schema; |
| } catch (SQLException e) { |
| throw new HoodieException("Failed to get table schema for " + tableName, e); |
| } finally { |
| closeQuietly(result, null); |
| } |
| } |
| |
| @Override |
| public void addPartitionsToTable(String tableName, List<String> partitionsToAdd) { |
| if (partitionsToAdd.isEmpty()) { |
| LOG.info("No partitions to add for " + tableName); |
| return; |
| } |
| LOG.info("Adding partitions " + partitionsToAdd.size() + " to table " + tableName); |
| String sql = constructAddPartitions(tableName, partitionsToAdd); |
| updateDLASQL(sql); |
| } |
| |
| public String constructAddPartitions(String tableName, List<String> partitions) { |
| return constructDLAAddPartitions(tableName, partitions); |
| } |
| |
| String generateAbsolutePathStr(Path path) { |
| String absolutePathStr = path.toString(); |
| if (path.toUri().getScheme() == null) { |
| absolutePathStr = getDefaultFs() + absolutePathStr; |
| } |
| return absolutePathStr.endsWith("/") ? absolutePathStr : absolutePathStr + "/"; |
| } |
| |
| public List<String> constructChangePartitions(String tableName, List<String> partitions) { |
| List<String> changePartitions = new ArrayList<>(); |
| String useDatabase = "USE " + DLA_ESCAPE_CHARACTER + dlaConfig.databaseName + DLA_ESCAPE_CHARACTER; |
| changePartitions.add(useDatabase); |
| String alterTable = "ALTER TABLE " + DLA_ESCAPE_CHARACTER + tableName + DLA_ESCAPE_CHARACTER; |
| for (String partition : partitions) { |
| String partitionClause = getPartitionClause(partition); |
| Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); |
| String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); |
| String changePartition = |
| alterTable + " ADD IF NOT EXISTS PARTITION (" + partitionClause + ") LOCATION '" + fullPartitionPathStr + "'"; |
| changePartitions.add(changePartition); |
| } |
| return changePartitions; |
| } |
| |
| /** |
| * Generate Hive Partition from partition values. |
| * |
| * @param partition Partition path |
| * @return |
| */ |
| public String getPartitionClause(String partition) { |
| List<String> partitionValues = partitionValueExtractor.extractPartitionValuesInPath(partition); |
| ValidationUtils.checkArgument(dlaConfig.partitionFields.size() == partitionValues.size(), |
| "Partition key parts " + dlaConfig.partitionFields + " does not match with partition values " + partitionValues |
| + ". Check partition strategy. "); |
| List<String> partBuilder = new ArrayList<>(); |
| for (int i = 0; i < dlaConfig.partitionFields.size(); i++) { |
| partBuilder.add(dlaConfig.partitionFields.get(i) + "='" + partitionValues.get(i) + "'"); |
| } |
| return partBuilder.stream().collect(Collectors.joining(",")); |
| } |
| |
| private String constructDLAAddPartitions(String tableName, List<String> partitions) { |
| StringBuilder alterSQL = new StringBuilder("ALTER TABLE "); |
| alterSQL.append(DLA_ESCAPE_CHARACTER).append(dlaConfig.databaseName) |
| .append(DLA_ESCAPE_CHARACTER).append(".").append(DLA_ESCAPE_CHARACTER) |
| .append(tableName).append(DLA_ESCAPE_CHARACTER).append(" ADD IF NOT EXISTS "); |
| for (String partition : partitions) { |
| String partitionClause = getPartitionClause(partition); |
| Path partitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); |
| String fullPartitionPathStr = generateAbsolutePathStr(partitionPath); |
| alterSQL.append(" PARTITION (").append(partitionClause).append(") LOCATION '").append(fullPartitionPathStr) |
| .append("' "); |
| } |
| return alterSQL.toString(); |
| } |
| |
| private void updateDLASQL(String sql) { |
| Statement stmt = null; |
| try { |
| stmt = connection.createStatement(); |
| LOG.info("Executing SQL " + sql); |
| stmt.execute(sql); |
| } catch (SQLException e) { |
| throw new HoodieException("Failed in executing SQL " + sql, e); |
| } finally { |
| closeQuietly(null, stmt); |
| } |
| } |
| |
| @Override |
| public boolean doesTableExist(String tableName) { |
| String sql = consutructShowCreateTableSQL(tableName); |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = connection.createStatement(); |
| rs = stmt.executeQuery(sql); |
| } catch (SQLException e) { |
| return false; |
| } finally { |
| closeQuietly(rs, stmt); |
| } |
| return true; |
| } |
| |
| @Override |
| public Option<String> getLastCommitTimeSynced(String tableName) { |
| String sql = consutructShowCreateTableSQL(tableName); |
| Statement stmt = null; |
| ResultSet rs = null; |
| try { |
| stmt = connection.createStatement(); |
| rs = stmt.executeQuery(sql); |
| if (rs.next()) { |
| String table = rs.getString(2); |
| Map<String, String> attr = new HashMap<>(); |
| int index = table.indexOf(TBL_PROPERTIES_STR); |
| if (index != -1) { |
| String sub = table.substring(index + TBL_PROPERTIES_STR.length()); |
| sub = sub.replaceAll("\\(", "").replaceAll("\\)", "").replaceAll("'", ""); |
| String[] str = sub.split(","); |
| |
| for (int i = 0; i < str.length; i++) { |
| String key = str[i].split("=")[0].trim(); |
| String value = str[i].split("=")[1].trim(); |
| attr.put(key, value); |
| } |
| } |
| return Option.ofNullable(attr.getOrDefault(HOODIE_LAST_COMMIT_TIME_SYNC, null)); |
| } |
| return Option.empty(); |
| } catch (Exception e) { |
| throw new HoodieHiveSyncException("Failed to get the last commit time synced from the table", e); |
| } finally { |
| closeQuietly(rs, stmt); |
| } |
| } |
| |
| @Override |
| public void updateLastCommitTimeSynced(String tableName) { |
| // TODO : dla do not support update tblproperties, so do nothing. |
| } |
| |
| @Override |
| public void updatePartitionsToTable(String tableName, List<String> changedPartitions) { |
| if (changedPartitions.isEmpty()) { |
| LOG.info("No partitions to change for " + tableName); |
| return; |
| } |
| LOG.info("Changing partitions " + changedPartitions.size() + " on " + tableName); |
| List<String> sqls = constructChangePartitions(tableName, changedPartitions); |
| for (String sql : sqls) { |
| updateDLASQL(sql); |
| } |
| } |
| |
| public Map<List<String>, String> scanTablePartitions(String tableName) { |
| String sql = constructShowPartitionSQL(tableName); |
| Statement stmt = null; |
| ResultSet rs = null; |
| Map<List<String>, String> partitions = new HashMap<>(); |
| try { |
| stmt = connection.createStatement(); |
| LOG.info("Executing SQL " + sql); |
| rs = stmt.executeQuery(sql); |
| while (rs.next()) { |
| if (rs.getMetaData().getColumnCount() > 0) { |
| String str = rs.getString(1); |
| if (!StringUtils.isNullOrEmpty(str)) { |
| List<String> values = partitionValueExtractor.extractPartitionValuesInPath(str); |
| Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, String.join("/", values)); |
| String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); |
| partitions.put(values, fullStoragePartitionPath); |
| } |
| } |
| } |
| return partitions; |
| } catch (SQLException e) { |
| throw new HoodieException("Failed in executing SQL " + sql, e); |
| } finally { |
| closeQuietly(rs, stmt); |
| } |
| } |
| |
| public List<PartitionEvent> getPartitionEvents(Map<List<String>, String> tablePartitions, List<String> partitionStoragePartitions) { |
| Map<String, String> paths = new HashMap<>(); |
| |
| for (Map.Entry<List<String>, String> entry : tablePartitions.entrySet()) { |
| List<String> partitionValues = entry.getKey(); |
| Collections.sort(partitionValues); |
| String fullTablePartitionPath = entry.getValue(); |
| paths.put(String.join(", ", partitionValues), fullTablePartitionPath); |
| } |
| List<PartitionEvent> events = new ArrayList<>(); |
| for (String storagePartition : partitionStoragePartitions) { |
| Path storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, storagePartition); |
| String fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); |
| // Check if the partition values or if hdfs path is the same |
| List<String> storagePartitionValues = partitionValueExtractor.extractPartitionValuesInPath(storagePartition); |
| if (dlaConfig.useDLASyncHiveStylePartitioning) { |
| String partition = String.join("/", storagePartitionValues); |
| storagePartitionPath = FSUtils.getPartitionPath(dlaConfig.basePath, partition); |
| fullStoragePartitionPath = Path.getPathWithoutSchemeAndAuthority(storagePartitionPath).toUri().getPath(); |
| } |
| Collections.sort(storagePartitionValues); |
| if (!storagePartitionValues.isEmpty()) { |
| String storageValue = String.join(", ", storagePartitionValues); |
| if (!paths.containsKey(storageValue)) { |
| events.add(PartitionEvent.newPartitionAddEvent(storagePartition)); |
| } else if (!paths.get(storageValue).equals(fullStoragePartitionPath)) { |
| events.add(PartitionEvent.newPartitionUpdateEvent(storagePartition)); |
| } |
| } |
| } |
| return events; |
| } |
| |
| public void updateTableDefinition(String tableName, SchemaDifference schemaDiff) { |
| ValidationUtils.checkArgument(schemaDiff.getDeleteColumns().size() == 0, "not support delete columns"); |
| ValidationUtils.checkArgument(schemaDiff.getUpdateColumnTypes().size() == 0, "not support alter column type"); |
| Map<String, String> columns = schemaDiff.getAddColumnTypes(); |
| for (Map.Entry<String, String> entry : columns.entrySet()) { |
| String columnName = entry.getKey(); |
| String columnType = entry.getValue(); |
| StringBuilder sqlBuilder = new StringBuilder("ALTER TABLE ").append(DLA_ESCAPE_CHARACTER) |
| .append(dlaConfig.databaseName).append(DLA_ESCAPE_CHARACTER).append(".") |
| .append(DLA_ESCAPE_CHARACTER).append(tableName) |
| .append(DLA_ESCAPE_CHARACTER).append(" ADD COLUMNS(") |
| .append(columnName).append(" ").append(columnType).append(" )"); |
| LOG.info("Updating table definition with " + sqlBuilder); |
| updateDLASQL(sqlBuilder.toString()); |
| } |
| } |
| |
| public void close() { |
| try { |
| if (connection != null) { |
| connection.close(); |
| } |
| } catch (SQLException e) { |
| LOG.error("Could not close connection ", e); |
| } |
| } |
| |
| private String constructShowPartitionSQL(String tableName) { |
| String sql = "show partitions " + dlaConfig.databaseName + "." + tableName; |
| return sql; |
| } |
| |
| private String consutructShowCreateTableSQL(String tableName) { |
| String sql = "show create table " + dlaConfig.databaseName + "." + tableName; |
| return sql; |
| } |
| |
| private String getDefaultFs() { |
| return fs.getConf().get("fs.defaultFS"); |
| } |
| |
| private HiveSyncConfig toHiveSyncConfig() { |
| HiveSyncConfig hiveSyncConfig = new HiveSyncConfig(); |
| hiveSyncConfig.partitionFields = dlaConfig.partitionFields; |
| hiveSyncConfig.databaseName = dlaConfig.databaseName; |
| Path basePath = new Path(dlaConfig.basePath); |
| hiveSyncConfig.basePath = generateAbsolutePathStr(basePath); |
| return hiveSyncConfig; |
| } |
| } |