| /* |
| * Copyright (c) 2016 Uber Technologies, Inc. (hoodie-dev-group@uber.com) |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package com.uber.hoodie.table; |
| |
| import com.google.common.collect.Maps; |
| import com.google.common.hash.Hashing; |
| import com.uber.hoodie.WriteStatus; |
| import com.uber.hoodie.avro.model.HoodieCompactionPlan; |
| import com.uber.hoodie.common.HoodieCleanStat; |
| import com.uber.hoodie.common.HoodieRollbackStat; |
| import com.uber.hoodie.common.model.HoodieCommitMetadata; |
| import com.uber.hoodie.common.model.HoodieDataFile; |
| import com.uber.hoodie.common.model.HoodieKey; |
| import com.uber.hoodie.common.model.HoodieRecord; |
| import com.uber.hoodie.common.model.HoodieRecordLocation; |
| import com.uber.hoodie.common.model.HoodieRecordPayload; |
| import com.uber.hoodie.common.model.HoodieRollingStatMetadata; |
| import com.uber.hoodie.common.model.HoodieWriteStat; |
| import com.uber.hoodie.common.table.HoodieTableMetaClient; |
| import com.uber.hoodie.common.table.HoodieTimeline; |
| import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; |
| import com.uber.hoodie.common.table.timeline.HoodieInstant; |
| import com.uber.hoodie.common.util.FSUtils; |
| import com.uber.hoodie.common.util.collection.Pair; |
| import com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor; |
| import com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer; |
| import com.uber.hoodie.config.HoodieWriteConfig; |
| import com.uber.hoodie.exception.HoodieException; |
| import com.uber.hoodie.exception.HoodieIOException; |
| import com.uber.hoodie.exception.HoodieNotSupportedException; |
| import com.uber.hoodie.exception.HoodieUpsertException; |
| import com.uber.hoodie.func.CopyOnWriteLazyInsertIterable; |
| import com.uber.hoodie.func.ParquetReaderIterator; |
| import com.uber.hoodie.func.SparkBoundedInMemoryExecutor; |
| import com.uber.hoodie.io.HoodieCleanHelper; |
| import com.uber.hoodie.io.HoodieCreateHandle; |
| import com.uber.hoodie.io.HoodieMergeHandle; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.generic.IndexedRecord; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.parquet.avro.AvroParquetReader; |
| import org.apache.parquet.avro.AvroReadSupport; |
| import org.apache.parquet.hadoop.ParquetReader; |
| import org.apache.spark.Partitioner; |
| import org.apache.spark.api.java.JavaRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.api.java.function.Function; |
| import org.apache.spark.api.java.function.Function2; |
| import org.apache.spark.api.java.function.PairFlatMapFunction; |
| import scala.Option; |
| import scala.Tuple2; |
| |
| /** |
| * Implementation of a very heavily read-optimized Hoodie Table where |
| * <p> |
| * INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing |
| * file, to expand it |
| * <p> |
| * UPDATES - Produce a new version of the file, just replacing the updated records with new values |
| */ |
| public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieTable<T> { |
| |
| private static Logger logger = LogManager.getLogger(HoodieCopyOnWriteTable.class); |
| |
| public HoodieCopyOnWriteTable(HoodieWriteConfig config, JavaSparkContext jsc) { |
| super(config, jsc); |
| } |
| |
| private static PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, |
| PartitionCleanStat> deleteFilesFunc( |
| HoodieTable table) { |
| return (PairFlatMapFunction<Iterator<Tuple2<String, String>>, String, PartitionCleanStat>) |
| iter -> { |
| Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>(); |
| |
| FileSystem fs = table.getMetaClient().getFs(); |
| while (iter.hasNext()) { |
| Tuple2<String, String> partitionDelFileTuple = iter.next(); |
| String partitionPath = partitionDelFileTuple._1(); |
| String deletePathStr = partitionDelFileTuple._2(); |
| Boolean deletedFileResult = deleteFileAndGetResult(fs, deletePathStr); |
| if (!partitionCleanStatMap.containsKey(partitionPath)) { |
| partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath)); |
| } |
| PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath); |
| partitionCleanStat.addDeleteFilePatterns(deletePathStr); |
| partitionCleanStat.addDeletedFileResult(deletePathStr, deletedFileResult); |
| } |
| |
| return partitionCleanStatMap.entrySet().stream() |
| .map(e -> new Tuple2<>(e.getKey(), e.getValue())) |
| .collect(Collectors.toList()).iterator(); |
| }; |
| } |
| |
| private static PairFlatMapFunction<String, String, String> getFilesToDeleteFunc(HoodieTable table, |
| HoodieWriteConfig config) { |
| return (PairFlatMapFunction<String, String, String>) partitionPathToClean -> { |
| HoodieCleanHelper cleaner = new HoodieCleanHelper(table, config); |
| return cleaner.getDeletePaths(partitionPathToClean).stream() |
| .map(deleteFile -> new Tuple2<>(partitionPathToClean, deleteFile.toString())).iterator(); |
| }; |
| } |
| |
| private static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) |
| throws IOException { |
| Path deletePath = new Path(deletePathStr); |
| logger.debug("Working on delete path :" + deletePath); |
| boolean deleteResult = fs.delete(deletePath, false); |
| if (deleteResult) { |
| logger.debug("Cleaned file at path :" + deletePath); |
| } |
| return deleteResult; |
| } |
| |
| @Override |
| public Partitioner getUpsertPartitioner(WorkloadProfile profile) { |
| if (profile == null) { |
| throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner."); |
| } |
| return new UpsertPartitioner(profile); |
| } |
| |
| @Override |
| public Partitioner getInsertPartitioner(WorkloadProfile profile) { |
| return getUpsertPartitioner(profile); |
| } |
| |
| @Override |
| public boolean isWorkloadProfileNeeded() { |
| return true; |
| } |
| |
| @Override |
| public HoodieCompactionPlan scheduleCompaction(JavaSparkContext jsc, String commitTime) { |
| throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); |
| } |
| |
| @Override |
| public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, String compactionInstantTime, |
| HoodieCompactionPlan compactionPlan) { |
| throw new HoodieNotSupportedException("Compaction is not supported from a CopyOnWrite table"); |
| } |
| |
| public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, |
| Iterator<HoodieRecord<T>> recordItr) throws IOException { |
| // these are updates |
| HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, recordItr); |
| return handleUpdateInternal(upsertHandle, commitTime, fileId); |
| } |
| |
| public Iterator<List<WriteStatus>> handleUpdate(String commitTime, String fileId, |
| Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileOpt) throws IOException { |
| // these are updates |
| HoodieMergeHandle upsertHandle = getUpdateHandle(commitTime, fileId, keyToNewRecords, dataFileOpt); |
| return handleUpdateInternal(upsertHandle, commitTime, fileId); |
| } |
| |
| protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle upsertHandle, |
| String commitTime, String fileId) |
| throws IOException { |
| if (upsertHandle.getOldFilePath() == null) { |
| throw new HoodieUpsertException( |
| "Error in finding the old file path at commit " + commitTime + " for fileId: " + fileId); |
| } else { |
| AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getSchema()); |
| ParquetReader<IndexedRecord> reader = AvroParquetReader.builder(upsertHandle.getOldFilePath()) |
| .withConf(getHadoopConf()).build(); |
| BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null; |
| try { |
| wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader), |
| new UpdateHandler(upsertHandle), x -> x); |
| wrapper.execute(); |
| } catch (Exception e) { |
| throw new HoodieException(e); |
| } finally { |
| reader.close(); |
| upsertHandle.close(); |
| if (null != wrapper) { |
| wrapper.shutdownNow(); |
| } |
| } |
| } |
| |
| //TODO(vc): This needs to be revisited |
| if (upsertHandle.getWriteStatus().getPartitionPath() == null) { |
| logger.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() |
| + ", " + upsertHandle.getWriteStatus()); |
| } |
| return Collections.singletonList(Collections.singletonList(upsertHandle.getWriteStatus())) |
| .iterator(); |
| } |
| |
| protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, |
| Iterator<HoodieRecord<T>> recordItr) { |
| return new HoodieMergeHandle<>(config, commitTime, this, recordItr, fileId); |
| } |
| |
| protected HoodieMergeHandle getUpdateHandle(String commitTime, String fileId, |
| Map<String, HoodieRecord<T>> keyToNewRecords, Optional<HoodieDataFile> dataFileToBeMerged) { |
| return new HoodieMergeHandle<>(config, commitTime, this, keyToNewRecords, fileId, dataFileToBeMerged); |
| } |
| |
| public Iterator<List<WriteStatus>> handleInsert(String commitTime, |
| Iterator<HoodieRecord<T>> recordItr) throws Exception { |
| return new CopyOnWriteLazyInsertIterable<>(recordItr, config, commitTime, this); |
| } |
| |
| public Iterator<List<WriteStatus>> handleInsert(String commitTime, String partitionPath, String fileId, |
| Iterator<HoodieRecord<T>> recordItr) { |
| HoodieCreateHandle createHandle = new HoodieCreateHandle(config, commitTime, this, partitionPath, fileId, |
| recordItr); |
| createHandle.write(); |
| return Collections.singletonList(Collections.singletonList(createHandle.close())).iterator(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public Iterator<List<WriteStatus>> handleUpsertPartition(String commitTime, Integer partition, |
| Iterator recordItr, Partitioner partitioner) { |
| UpsertPartitioner upsertPartitioner = (UpsertPartitioner) partitioner; |
| BucketInfo binfo = upsertPartitioner.getBucketInfo(partition); |
| BucketType btype = binfo.bucketType; |
| try { |
| if (btype.equals(BucketType.INSERT)) { |
| return handleInsert(commitTime, recordItr); |
| } else if (btype.equals(BucketType.UPDATE)) { |
| return handleUpdate(commitTime, binfo.fileLoc, recordItr); |
| } else { |
| throw new HoodieUpsertException( |
| "Unknown bucketType " + btype + " for partition :" + partition); |
| } |
| } catch (Throwable t) { |
| String msg = "Error upserting bucketType " + btype + " for partition :" + partition; |
| logger.error(msg, t); |
| throw new HoodieUpsertException(msg, t); |
| } |
| } |
| |
| @Override |
| public Iterator<List<WriteStatus>> handleInsertPartition(String commitTime, Integer partition, |
| Iterator recordItr, Partitioner partitioner) { |
| return handleUpsertPartition(commitTime, partition, recordItr, partitioner); |
| } |
| |
| /** |
| * Performs cleaning of partition paths according to cleaning policy and returns the number of |
| * files cleaned. Handles skews in partitions to clean by making files to clean as the unit of |
| * task distribution. |
| * |
| * @throws IllegalArgumentException if unknown cleaning policy is provided |
| */ |
| @Override |
| public List<HoodieCleanStat> clean(JavaSparkContext jsc) { |
| try { |
| FileSystem fs = getMetaClient().getFs(); |
| List<String> partitionsToClean = FSUtils |
| .getAllPartitionPaths(fs, getMetaClient().getBasePath(), |
| config.shouldAssumeDatePartitioning()); |
| logger.info("Partitions to clean up : " + partitionsToClean + ", with policy " + config |
| .getCleanerPolicy()); |
| if (partitionsToClean.isEmpty()) { |
| logger.info("Nothing to clean here mom. It is already clean"); |
| return Collections.emptyList(); |
| } |
| return cleanPartitionPaths(partitionsToClean, jsc); |
| } catch (IOException e) { |
| throw new HoodieIOException("Failed to clean up after commit", e); |
| } |
| } |
| |
| /** |
| * Common method used for cleaning out parquet files under a partition path during rollback of a |
| * set of commits |
| */ |
| protected Map<FileStatus, Boolean> deleteCleanedFiles(String partitionPath, List<String> commits) |
| throws IOException { |
| Map<FileStatus, Boolean> results = Maps.newHashMap(); |
| // PathFilter to get all parquet files and log files that need to be deleted |
| PathFilter filter = (path) -> { |
| if (path.toString().contains(".parquet")) { |
| String fileCommitTime = FSUtils.getCommitTime(path.getName()); |
| return commits.contains(fileCommitTime); |
| } |
| return false; |
| }; |
| deleteCleanedFiles(results, partitionPath, filter); |
| return results; |
| } |
| |
| /** |
| * Common method used for cleaning out parquet files under a partition path during rollback of a |
| * set of commits |
| */ |
| protected Map<FileStatus, Boolean> deleteCleanedFiles(Map<FileStatus, Boolean> results, String partitionPath, |
| PathFilter filter) |
| throws IOException { |
| logger.info("Cleaning path " + partitionPath); |
| FileSystem fs = getMetaClient().getFs(); |
| FileStatus[] toBeDeleted = fs.listStatus(new Path(config.getBasePath(), partitionPath), filter); |
| for (FileStatus file : toBeDeleted) { |
| boolean success = fs.delete(file.getPath(), false); |
| results.put(file, success); |
| logger.info("Delete file " + file.getPath() + "\t" + success); |
| } |
| return results; |
| } |
| |
| @Override |
| public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> commits) |
| throws IOException { |
| String actionType = metaClient.getCommitActionType(); |
| HoodieActiveTimeline activeTimeline = this.getActiveTimeline(); |
| List<String> inflights = this.getInflightCommitTimeline().getInstants() |
| .map(HoodieInstant::getTimestamp).collect(Collectors.toList()); |
| |
| // Atomically unpublish all the commits |
| commits.stream().filter(s -> !inflights.contains(s)) |
| .map(s -> new HoodieInstant(false, actionType, s)) |
| .forEach(activeTimeline::revertToInflight); |
| logger.info("Unpublished " + commits); |
| |
| // delete all the data files for all these commits |
| logger.info("Clean out all parquet files generated for commits: " + commits); |
| List<HoodieRollbackStat> stats = jsc.parallelize(FSUtils |
| .getAllPartitionPaths(metaClient.getFs(), getMetaClient().getBasePath(), |
| config.shouldAssumeDatePartitioning())) |
| .map((Function<String, HoodieRollbackStat>) partitionPath -> { |
| // Scan all partitions files with this commit time |
| Map<FileStatus, Boolean> results = deleteCleanedFiles(partitionPath, commits); |
| return HoodieRollbackStat.newBuilder().withPartitionPath(partitionPath) |
| .withDeletedFileResults(results).build(); |
| }).collect(); |
| |
| // clean temporary data files |
| cleanTemporaryDataFiles(jsc); |
| |
| // Remove the rolled back inflight commits |
| commits.stream().map(s -> new HoodieInstant(true, actionType, s)) |
| .forEach(activeTimeline::deleteInflight); |
| logger.info("Deleted inflight commits " + commits); |
| return stats; |
| } |
| |
| /** |
| * Finalize the written data files |
| * |
| * @param writeStatuses List of WriteStatus |
| * @return number of files finalized |
| */ |
| @Override |
| @SuppressWarnings("unchecked") |
| public Optional<Integer> finalizeWrite(JavaSparkContext jsc, List writeStatuses) { |
| if (!config.shouldUseTempFolderForCopyOnWrite()) { |
| return Optional.empty(); |
| } |
| |
| // This is to rename each data file from temporary path to its final location |
| List<Tuple2<String, Boolean>> results = jsc |
| .parallelize(writeStatuses, config.getFinalizeWriteParallelism()).map(writeStatus -> { |
| Tuple2<String, HoodieWriteStat> writeStatTuple2 = (Tuple2<String, HoodieWriteStat>) |
| writeStatus; |
| HoodieWriteStat writeStat = writeStatTuple2._2(); |
| final FileSystem fs = getMetaClient().getFs(); |
| final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); |
| |
| if (writeStat.getTempPath() != null) { |
| final Path tempPath = new Path(config.getBasePath(), writeStat.getTempPath()); |
| boolean success; |
| try { |
| logger.info("Renaming temporary file: " + tempPath + " to " + finalPath); |
| success = fs.rename(tempPath, finalPath); |
| } catch (IOException e) { |
| throw new HoodieIOException( |
| "Failed to rename file: " + tempPath + " to " + finalPath); |
| } |
| |
| if (!success) { |
| throw new HoodieIOException( |
| "Failed to rename file: " + tempPath + " to " + finalPath); |
| } |
| } |
| |
| return new Tuple2<>(writeStat.getPath(), true); |
| }).collect(); |
| |
| // clean temporary data files |
| cleanTemporaryDataFiles(jsc); |
| |
| return Optional.of(results.size()); |
| } |
| |
| /** |
| * Clean temporary data files that are produced from previous failed commit or retried spark |
| * stages. |
| */ |
| private void cleanTemporaryDataFiles(JavaSparkContext jsc) { |
| if (!config.shouldUseTempFolderForCopyOnWrite()) { |
| return; |
| } |
| |
| final FileSystem fs = getMetaClient().getFs(); |
| final Path temporaryFolder = new Path(config.getBasePath(), |
| HoodieTableMetaClient.TEMPFOLDER_NAME); |
| try { |
| if (!fs.exists(temporaryFolder)) { |
| logger.info("Temporary folder does not exist: " + temporaryFolder); |
| return; |
| } |
| List<FileStatus> fileStatusesList = Arrays.asList(fs.listStatus(temporaryFolder)); |
| List<Tuple2<String, Boolean>> results = jsc |
| .parallelize(fileStatusesList, config.getFinalizeWriteParallelism()).map(fileStatus -> { |
| FileSystem fs1 = getMetaClient().getFs(); |
| boolean success = fs1.delete(fileStatus.getPath(), false); |
| logger |
| .info("Deleting file in temporary folder" + fileStatus.getPath() + "\t" + success); |
| return new Tuple2<>(fileStatus.getPath().toString(), success); |
| }).collect(); |
| |
| for (Tuple2<String, Boolean> result : results) { |
| if (!result._2()) { |
| logger.info("Failed to delete file: " + result._1()); |
| throw new HoodieIOException("Failed to delete file in temporary folder: " + result._1()); |
| } |
| } |
| } catch (IOException e) { |
| throw new HoodieIOException( |
| "Failed to clean data files in temporary folder: " + temporaryFolder); |
| } |
| } |
| |
| private List<HoodieCleanStat> cleanPartitionPaths(List<String> partitionsToClean, |
| JavaSparkContext jsc) { |
| int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism()); |
| logger.info("Using cleanerParallelism: " + cleanerParallelism); |
| List<Tuple2<String, PartitionCleanStat>> partitionCleanStats = jsc |
| .parallelize(partitionsToClean, cleanerParallelism) |
| .flatMapToPair(getFilesToDeleteFunc(this, config)) |
| .repartition(cleanerParallelism) // repartition to remove skews |
| .mapPartitionsToPair(deleteFilesFunc(this)).reduceByKey( |
| // merge partition level clean stats below |
| (Function2<PartitionCleanStat, PartitionCleanStat, PartitionCleanStat>) (e1, e2) -> e1 |
| .merge(e2)).collect(); |
| |
| Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats.stream() |
| .collect(Collectors.toMap(e -> e._1(), e -> e._2())); |
| |
| HoodieCleanHelper cleaner = new HoodieCleanHelper(this, config); |
| // Return PartitionCleanStat for each partition passed. |
| return partitionsToClean.stream().map(partitionPath -> { |
| PartitionCleanStat partitionCleanStat = |
| (partitionCleanStatsMap.containsKey(partitionPath)) ? partitionCleanStatsMap |
| .get(partitionPath) : new PartitionCleanStat(partitionPath); |
| return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()) |
| .withPartitionPath(partitionPath) |
| .withEarliestCommitRetained(cleaner.getEarliestCommitToRetain()) |
| .withDeletePathPattern(partitionCleanStat.deletePathPatterns) |
| .withSuccessfulDeletes(partitionCleanStat.successDeleteFiles) |
| .withFailedDeletes(partitionCleanStat.failedDeleteFiles).build(); |
| }).collect(Collectors.toList()); |
| } |
| |
| enum BucketType { |
| UPDATE, INSERT |
| } |
| |
| /** |
| * Consumer that dequeues records from queue and sends to Merge Handle |
| */ |
| private static class UpdateHandler extends BoundedInMemoryQueueConsumer<GenericRecord, Void> { |
| |
| private final HoodieMergeHandle upsertHandle; |
| |
| private UpdateHandler(HoodieMergeHandle upsertHandle) { |
| this.upsertHandle = upsertHandle; |
| } |
| |
| @Override |
| protected void consumeOneRecord(GenericRecord record) { |
| upsertHandle.write(record); |
| } |
| |
| @Override |
| protected void finish() { |
| } |
| |
| @Override |
| protected Void getResult() { |
| return null; |
| } |
| } |
| |
| private static class PartitionCleanStat implements Serializable { |
| |
| private final String partitionPath; |
| private final List<String> deletePathPatterns = new ArrayList<>(); |
| private final List<String> successDeleteFiles = new ArrayList<>(); |
| private final List<String> failedDeleteFiles = new ArrayList<>(); |
| |
| private PartitionCleanStat(String partitionPath) { |
| this.partitionPath = partitionPath; |
| } |
| |
| private void addDeletedFileResult(String deletePathStr, Boolean deletedFileResult) { |
| if (deletedFileResult) { |
| successDeleteFiles.add(deletePathStr); |
| } else { |
| failedDeleteFiles.add(deletePathStr); |
| } |
| } |
| |
| private void addDeleteFilePatterns(String deletePathStr) { |
| deletePathPatterns.add(deletePathStr); |
| } |
| |
| private PartitionCleanStat merge(PartitionCleanStat other) { |
| if (!this.partitionPath.equals(other.partitionPath)) { |
| throw new RuntimeException(String |
| .format("partitionPath is not a match: (%s, %s)", partitionPath, other.partitionPath)); |
| } |
| successDeleteFiles.addAll(other.successDeleteFiles); |
| deletePathPatterns.addAll(other.deletePathPatterns); |
| failedDeleteFiles.addAll(other.failedDeleteFiles); |
| return this; |
| } |
| } |
| |
| /** |
| * Helper class for a small file's location and its actual size on disk |
| */ |
| class SmallFile implements Serializable { |
| |
| HoodieRecordLocation location; |
| long sizeBytes; |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder("SmallFile {"); |
| sb.append("location=").append(location).append(", "); |
| sb.append("sizeBytes=").append(sizeBytes); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Helper class for an insert bucket along with the weight [0.0, 0.1] that defines the amount of |
| * incoming inserts that should be allocated to the bucket |
| */ |
| class InsertBucket implements Serializable { |
| |
| int bucketNumber; |
| // fraction of total inserts, that should go into this bucket |
| double weight; |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder("WorkloadStat {"); |
| sb.append("bucketNumber=").append(bucketNumber).append(", "); |
| sb.append("weight=").append(weight); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Helper class for a bucket's type (INSERT and UPDATE) and its file location |
| */ |
| class BucketInfo implements Serializable { |
| |
| BucketType bucketType; |
| String fileLoc; |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder("BucketInfo {"); |
| sb.append("bucketType=").append(bucketType).append(", "); |
| sb.append("fileLoc=").append(fileLoc); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| } |
| |
| /** |
| * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition) |
| */ |
| class UpsertPartitioner extends Partitioner { |
| |
| /** |
| * List of all small files to be corrected |
| */ |
| List<SmallFile> smallFiles = new ArrayList<SmallFile>(); |
| /** |
| * Total number of RDD partitions, is determined by total buckets we want to pack the incoming |
| * workload into |
| */ |
| private int totalBuckets = 0; |
| /** |
| * Stat for the current workload. Helps in determining total inserts, upserts etc. |
| */ |
| private WorkloadStat globalStat; |
| /** |
| * Helps decide which bucket an incoming update should go to. |
| */ |
| private HashMap<String, Integer> updateLocationToBucket; |
| /** |
| * Helps us pack inserts into 1 or more buckets depending on number of incoming records. |
| */ |
| private HashMap<String, List<InsertBucket>> partitionPathToInsertBuckets; |
| /** |
| * Remembers what type each bucket is for later. |
| */ |
| private HashMap<Integer, BucketInfo> bucketInfoMap; |
| |
| /** |
| * Rolling stats for files |
| */ |
| protected HoodieRollingStatMetadata rollingStatMetadata; |
| protected long averageRecordSize; |
| |
| UpsertPartitioner(WorkloadProfile profile) { |
| updateLocationToBucket = new HashMap<>(); |
| partitionPathToInsertBuckets = new HashMap<>(); |
| bucketInfoMap = new HashMap<>(); |
| globalStat = profile.getGlobalStat(); |
| rollingStatMetadata = getRollingStats(); |
| assignUpdates(profile); |
| assignInserts(profile); |
| |
| logger.info( |
| "Total Buckets :" + totalBuckets + ", " + "buckets info => " + bucketInfoMap + ", \n" |
| + "Partition to insert buckets => " + partitionPathToInsertBuckets + ", \n" |
| + "UpdateLocations mapped to buckets =>" + updateLocationToBucket); |
| } |
| |
| private void assignUpdates(WorkloadProfile profile) { |
| // each update location gets a partition |
| WorkloadStat gStat = profile.getGlobalStat(); |
| for (Map.Entry<String, Pair<String, Long>> updateLocEntry : gStat.getUpdateLocationToCount() |
| .entrySet()) { |
| addUpdateBucket(updateLocEntry.getKey()); |
| } |
| } |
| |
| private int addUpdateBucket(String fileLoc) { |
| int bucket = totalBuckets; |
| updateLocationToBucket.put(fileLoc, bucket); |
| BucketInfo bucketInfo = new BucketInfo(); |
| bucketInfo.bucketType = BucketType.UPDATE; |
| bucketInfo.fileLoc = fileLoc; |
| bucketInfoMap.put(totalBuckets, bucketInfo); |
| totalBuckets++; |
| return bucket; |
| } |
| |
| private void assignInserts(WorkloadProfile profile) { |
| // for new inserts, compute buckets depending on how many records we have for each partition |
| Set<String> partitionPaths = profile.getPartitionPaths(); |
| long averageRecordSize = averageBytesPerRecord(); |
| logger.info("AvgRecordSize => " + averageRecordSize); |
| for (String partitionPath : partitionPaths) { |
| WorkloadStat pStat = profile.getWorkloadStat(partitionPath); |
| if (pStat.getNumInserts() > 0) { |
| |
| List<SmallFile> smallFiles = getSmallFiles(partitionPath); |
| logger.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); |
| |
| long totalUnassignedInserts = pStat.getNumInserts(); |
| List<Integer> bucketNumbers = new ArrayList<>(); |
| List<Long> recordsPerBucket = new ArrayList<>(); |
| |
| // first try packing this into one of the smallFiles |
| for (SmallFile smallFile : smallFiles) { |
| long recordsToAppend = Math |
| .min((config.getParquetMaxFileSize() - smallFile.sizeBytes) / averageRecordSize, |
| totalUnassignedInserts); |
| if (recordsToAppend > 0 && totalUnassignedInserts > 0) { |
| // create a new bucket or re-use an existing bucket |
| int bucket; |
| if (updateLocationToBucket.containsKey(smallFile.location.getFileId())) { |
| bucket = updateLocationToBucket.get(smallFile.location.getFileId()); |
| logger.info("Assigning " + recordsToAppend + " inserts to existing update bucket " |
| + bucket); |
| } else { |
| bucket = addUpdateBucket(smallFile.location.getFileId()); |
| logger.info( |
| "Assigning " + recordsToAppend + " inserts to new update bucket " + bucket); |
| } |
| bucketNumbers.add(bucket); |
| recordsPerBucket.add(recordsToAppend); |
| totalUnassignedInserts -= recordsToAppend; |
| } |
| } |
| |
| // if we have anything more, create new insert buckets, like normal |
| if (totalUnassignedInserts > 0) { |
| long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize(); |
| if (config.shouldAutoTuneInsertSplits()) { |
| insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; |
| } |
| |
| int insertBuckets = (int) Math.max(totalUnassignedInserts / insertRecordsPerBucket, 1L); |
| logger.info( |
| "After small file assignment: unassignedInserts => " + totalUnassignedInserts |
| + ", totalInsertBuckets => " + insertBuckets + ", recordsPerBucket => " |
| + insertRecordsPerBucket); |
| for (int b = 0; b < insertBuckets; b++) { |
| bucketNumbers.add(totalBuckets); |
| recordsPerBucket.add(totalUnassignedInserts / insertBuckets); |
| BucketInfo bucketInfo = new BucketInfo(); |
| bucketInfo.bucketType = BucketType.INSERT; |
| bucketInfoMap.put(totalBuckets, bucketInfo); |
| totalBuckets++; |
| } |
| } |
| |
| // Go over all such buckets, and assign weights as per amount of incoming inserts. |
| List<InsertBucket> insertBuckets = new ArrayList<>(); |
| for (int i = 0; i < bucketNumbers.size(); i++) { |
| InsertBucket bkt = new InsertBucket(); |
| bkt.bucketNumber = bucketNumbers.get(i); |
| bkt.weight = (1.0 * recordsPerBucket.get(i)) / pStat.getNumInserts(); |
| insertBuckets.add(bkt); |
| } |
| logger.info( |
| "Total insert buckets for partition path " + partitionPath + " => " + insertBuckets); |
| partitionPathToInsertBuckets.put(partitionPath, insertBuckets); |
| } |
| } |
| } |
| |
| |
| /** |
| * Returns a list of small files in the given partition path |
| */ |
| protected List<SmallFile> getSmallFiles(String partitionPath) { |
| |
| // smallFiles only for partitionPath |
| List<SmallFile> smallFileLocations = new ArrayList<>(); |
| |
| HoodieTimeline commitTimeline = getCompletedCommitTimeline(); |
| |
| if (!commitTimeline.empty()) { // if we have some commits |
| HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); |
| List<HoodieDataFile> allFiles = getROFileSystemView() |
| .getLatestDataFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()) |
| .collect(Collectors.toList()); |
| |
| for (HoodieDataFile file : allFiles) { |
| if (file.getFileSize() < config.getParquetSmallFileLimit()) { |
| String filename = file.getFileName(); |
| SmallFile sf = new SmallFile(); |
| sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), |
| FSUtils.getFileId(filename)); |
| sf.sizeBytes = file.getFileSize(); |
| smallFileLocations.add(sf); |
| // Update the global small files list |
| smallFiles.add(sf); |
| } |
| } |
| } |
| |
| return smallFileLocations; |
| } |
| |
| /** |
| * Obtains the average record size based on records written during last commit. Used for |
| * estimating how many records pack into one file. |
| */ |
| protected long averageBytesPerRecord() { |
| long avgSize = 0L; |
| HoodieTimeline commitTimeline = metaClient.getActiveTimeline().getCommitTimeline() |
| .filterCompletedInstants(); |
| try { |
| if (!commitTimeline.empty()) { |
| HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); |
| HoodieCommitMetadata commitMetadata = HoodieCommitMetadata |
| .fromBytes(commitTimeline.getInstantDetails(latestCommitTime).get(), HoodieCommitMetadata.class); |
| avgSize = (long) Math.ceil( |
| (1.0 * commitMetadata.fetchTotalBytesWritten()) / commitMetadata |
| .fetchTotalRecordsWritten()); |
| } |
| } catch (Throwable t) { |
| // make this fail safe. |
| logger.error("Error trying to compute average bytes/record ", t); |
| } |
| return avgSize <= 0L ? config.getCopyOnWriteRecordSizeEstimate() : avgSize; |
| } |
| |
| public BucketInfo getBucketInfo(int bucketNumber) { |
| return bucketInfoMap.get(bucketNumber); |
| } |
| |
| public List<InsertBucket> getInsertBuckets(String partitionPath) { |
| return partitionPathToInsertBuckets.get(partitionPath); |
| } |
| |
| @Override |
| public int numPartitions() { |
| return totalBuckets; |
| } |
| |
| @Override |
| public int getPartition(Object key) { |
| Tuple2<HoodieKey, Option<HoodieRecordLocation>> keyLocation = (Tuple2<HoodieKey, |
| Option<HoodieRecordLocation>>) key; |
| if (keyLocation._2().isDefined()) { |
| HoodieRecordLocation location = keyLocation._2().get(); |
| return updateLocationToBucket.get(location.getFileId()); |
| } else { |
| List<InsertBucket> targetBuckets = partitionPathToInsertBuckets |
| .get(keyLocation._1().getPartitionPath()); |
| // pick the target bucket to use based on the weights. |
| double totalWeight = 0.0; |
| final long totalInserts = Math.max(1, globalStat.getNumInserts()); |
| final long hashOfKey = Hashing.md5() |
| .hashString(keyLocation._1().getRecordKey(), StandardCharsets.UTF_8).asLong(); |
| final double r = 1.0 * Math.floorMod(hashOfKey, totalInserts) / totalInserts; |
| for (InsertBucket insertBucket : targetBuckets) { |
| totalWeight += insertBucket.weight; |
| if (r <= totalWeight) { |
| return insertBucket.bucketNumber; |
| } |
| } |
| // return first one, by default |
| return targetBuckets.get(0).bucketNumber; |
| } |
| } |
| } |
| |
| protected HoodieRollingStatMetadata getRollingStats() { |
| return null; |
| } |
| } |