| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| package org.apache.impala.util; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Range; |
| import com.google.errorprone.annotations.Immutable; |
| |
| import java.io.IOException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.ValidTxnList; |
| import org.apache.hadoop.hive.common.ValidWriteIdList; |
| import org.apache.hadoop.hive.common.ValidWriteIdList.RangeResponse; |
| import org.apache.hadoop.hive.metastore.api.MetaException; |
| import org.apache.impala.catalog.CatalogException; |
| import org.apache.impala.catalog.CatalogServiceCatalog; |
| import org.apache.impala.catalog.Column; |
| import org.apache.impala.catalog.FileMetadataLoader.LoadStats; |
| import org.apache.impala.catalog.HdfsPartition.FileDescriptor; |
| import org.apache.impala.catalog.HdfsTable; |
| import org.apache.impala.catalog.ScalarType; |
| import org.apache.impala.catalog.StructField; |
| import org.apache.impala.catalog.StructType; |
| import org.apache.impala.common.FileSystemUtil; |
| import org.apache.impala.common.Pair; |
| import org.apache.impala.common.Reference; |
| import org.apache.impala.thrift.THdfsFileDesc; |
| import org.apache.impala.thrift.TPartialPartitionInfo; |
| import org.apache.impala.thrift.TTransactionalType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.function.Predicate; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.annotation.Nullable; |
| |
| /** |
| * Contains utility functions for working with Acid tables. |
| * <p> |
| * The code is mostly copy pasted from Hive. Ideally we should use the |
| * code directly from Hive. |
| * </p> |
| */ |
| public class AcidUtils { |
| private final static Logger LOG = LoggerFactory.getLogger(AcidUtils.class); |
| // Constant also defined in TransactionalValidationListener |
| public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only"; |
| // Constant also defined in hive_metastoreConstants |
| public static final String TABLE_IS_TRANSACTIONAL = "transactional"; |
| public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties"; |
| |
| /** |
| * Transaction parameters needed for single table operations. |
| */ |
| public static class TblTransaction { |
| public long txnId; |
| public boolean ownsTxn; |
| public long writeId; |
| public String validWriteIds; |
| } |
| |
| // Regex pattern for files in base directories. The pattern matches strings like |
| // "base_0000005/abc.txt", |
| // "base_0000005/0000/abc.txt", |
| // "base_0000003_v0003217/000000_0" |
| private static final Pattern BASE_PATTERN = Pattern.compile( |
| "base_" + |
| "(?<writeId>\\d+)" + |
| "(?:_v(?<visibilityTxnId>\\d+))?" + |
| "(?:/.*)?"); |
| |
| // Regex pattern for files in delta directories. The pattern matches strings like |
| // "delta_0000006_0000006/000000_0", |
| // "delta_0000009_0000009_0000/0000/def.txt" |
| private static final String DELTA_STR = |
| "delta_" + |
| "(?<minWriteId>\\d+)_" + |
| "(?<maxWriteId>\\d+)" + |
| // Statement id, or visiblityTxnId |
| "(?:_(?<statementId>\\d+)|_v(?<visibilityTxnId>\\d+))?" + |
| // Optional path suffix. |
| "(?:/.*)?"; |
| |
| private static final Pattern DELTA_PATTERN = Pattern.compile(DELTA_STR); |
| |
| // Regex pattern for files in delete delta directories. The pattern is similar to |
| // the 'DELTA_PATTERN', but starts with "delete_". |
| private static final Pattern DELETE_DELTA_PATTERN = Pattern.compile( |
| "delete_" + DELTA_STR); |
| |
| @VisibleForTesting |
| static final long SENTINEL_BASE_WRITE_ID = Long.MIN_VALUE; |
| |
| // The code is same as what exists in AcidUtils.java in hive-exec. |
| // Ideally we should move the AcidUtils code from hive-exec into |
| // hive-standalone-metastore or some other jar and use it here. |
| private static boolean isInsertOnlyTable(Map<String, String> props) { |
| Preconditions.checkNotNull(props); |
| if (!isTransactionalTable(props)) { |
| return false; |
| } |
| String transactionalProp = props.get(TABLE_TRANSACTIONAL_PROPERTIES); |
| return transactionalProp != null && INSERTONLY_TRANSACTIONAL_PROPERTY. |
| equalsIgnoreCase(transactionalProp); |
| } |
| |
| public static boolean isTransactionalTable(Map<String, String> props) { |
| Preconditions.checkNotNull(props); |
| String tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL); |
| if (tableIsTransactional == null) { |
| tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL.toUpperCase()); |
| } |
| return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true"); |
| } |
| |
| public static boolean isFullAcidTable(Map<String, String> props) { |
| return isTransactionalTable(props) && !isInsertOnlyTable(props); |
| } |
| |
| public static Column getRowIdColumnType(int position) { |
| StructType row__id = new StructType(); |
| row__id.addField(new StructField("operation", ScalarType.INT, "")); |
| row__id.addField(new StructField("originaltransaction", ScalarType.BIGINT, "")); |
| row__id.addField(new StructField("bucket", ScalarType.INT, "")); |
| row__id.addField(new StructField("rowid", ScalarType.BIGINT, "")); |
| row__id.addField(new StructField("currenttransaction", ScalarType.BIGINT, "")); |
| return new Column("row__id", row__id, "", position); |
| } |
| |
| // Sets transaction related table properties for new tables based on manually |
| // set table properties and default transactional type. |
| public static void setTransactionalProperties(Map<String, String> props, |
| TTransactionalType defaultTransactionalType) { |
| Preconditions.checkNotNull(props); |
| if (props.get(TABLE_IS_TRANSACTIONAL) != null |
| || props.get(TABLE_TRANSACTIONAL_PROPERTIES) != null) { |
| // Table properties are set manually, ignore default. |
| return; |
| } |
| |
| switch (defaultTransactionalType) { |
| case NONE: break; |
| case INSERT_ONLY: |
| props.put(TABLE_IS_TRANSACTIONAL, "true"); |
| props.put(TABLE_TRANSACTIONAL_PROPERTIES, INSERTONLY_TRANSACTIONAL_PROPERTY); |
| break; |
| } |
| } |
| |
| /** |
| * This method is copied from Hive's org.apache.hadoop.hive.ql.io.AcidUtils.java |
| * (commit hash 17ac1d9f230b8d663c09c22016753012a9b91edf). It is used to generate |
| * the ACID directory names to be added to the insert events on the transactional |
| * tables. |
| */ |
| //Get the first level acid directory (if any) from a given path |
| public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem) |
| throws IOException { |
| if (dataPath == null) { |
| return null; |
| } |
| String firstLevelAcidDir = getAcidSubDir(dataPath); |
| if (firstLevelAcidDir != null) { |
| return firstLevelAcidDir; |
| } |
| |
| String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem); |
| if (acidDirPath == null) { |
| return null; |
| } |
| |
| // We need the path for directory so no need to append file name |
| if (fileSystem.isDirectory(dataPath)) { |
| return acidDirPath + Path.SEPARATOR + dataPath.getName(); |
| } |
| return acidDirPath; |
| } |
| |
| private static String getAcidSubDir(Path dataPath) { |
| String dataDir = dataPath.getName(); |
| if (dataDir.startsWith("base_") |
| || dataDir.startsWith("delta_") |
| || dataDir.startsWith("delete_delta_")) { |
| return dataDir; |
| } |
| return null; |
| } |
| |
| /** |
| * Predicate that checks if the file or directory is relevant for a given WriteId list. |
| * The class does not implement a Predicate interface since we want to support a strict |
| * mode which throw exception in certain cases. |
| * <p> |
| * <b>Must be called only for ACID table.</b> |
| * Checks that the path conforms to ACID table dir structure, and includes only |
| * directories that correspond to valid committed transactions. |
| * </p> |
| */ |
| private static class WriteListBasedPredicate { |
| |
| @Nullable |
| private final ValidTxnList validTxnList; |
| private final ValidWriteIdList writeIdList; |
| // when strict mode is turned on, it throws exceptions when a given base file |
| // is invalid or a compacted delta file has some open writeIds. |
| private final boolean doStrictCheck; |
| |
| /** |
| * Creates a Predicate just based on WriteIdList. This is used to filter out |
| * already cached filedescriptors where it is guaranteed that files related to |
| * invalid transactions are not loaded. |
| * @param writeIdList |
| */ |
| WriteListBasedPredicate(ValidWriteIdList writeIdList, boolean strictMode) { |
| this.validTxnList = null; |
| this.writeIdList = Preconditions.checkNotNull(writeIdList); |
| this.doStrictCheck = strictMode; |
| } |
| |
| /** |
| * Creates a Predicate based on a ValidTxnList and ValidWriteIdList. Useful when we |
| * are filtering out the file listing directly from fileSystem which may included |
| * compacted directories. |
| * |
| * @param validTxnList |
| * @param writeIdList |
| */ |
| WriteListBasedPredicate(ValidTxnList validTxnList, ValidWriteIdList writeIdList) { |
| this.validTxnList = Preconditions.checkNotNull(validTxnList); |
| this.writeIdList = Preconditions.checkNotNull(writeIdList); |
| this.doStrictCheck = false; |
| } |
| |
| public boolean check(String dirPath) throws CatalogException { |
| ParsedBase parsedBase = parseBase(dirPath); |
| if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) { |
| boolean isValid = writeIdList.isValidBase(parsedBase.writeId) && |
| isTxnValid(parsedBase.visibilityTxnId); |
| if (doStrictCheck && !isValid) { |
| throw new CatalogException("Invalid base file found " + dirPath); |
| } |
| return isValid; |
| } else { |
| ParsedDelta pd = parseDelta(dirPath); |
| if (pd == null) pd = parseDeleteDelta(dirPath); |
| if (pd != null) { |
| if (!isTxnValid(pd.visibilityTxnId)) return false; |
| ValidWriteIdList.RangeResponse rr = |
| writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId); |
| if (rr == RangeResponse.ALL) return true; |
| if (rr == RangeResponse.NONE) return false; |
| // either this is compacted delta file whose visibility transaction id is |
| // valid or a delta file generated by Hive Streaming engine. |
| // We allow the delta files for streaming engine which have open writeIds since |
| // backend code handles such writeIds appropriately. |
| if (!pd.isCompactedDeltaFile()) return true; |
| // This is a compacted delta file and has some writeIds which are not valid. |
| // We allow only aborted and committed writeIds in compacted files (no open |
| // writeIds) |
| for (long writeId = pd.minWriteId; writeId<=pd.maxWriteId; writeId++) { |
| if (!writeIdList.isWriteIdValid(writeId) && !writeIdList |
| .isWriteIdAborted(writeId)) { |
| if (doStrictCheck) { |
| throw new CatalogException( |
| "Open writeId " + writeId + " found in compacted delta file " |
| + dirPath); |
| } |
| return false; |
| } |
| } |
| return true; |
| } |
| } |
| // If it wasn't in a base or a delta directory, we should include it. |
| // This allows post-upgrade tables to be read. |
| // TODO(todd) add an e2e test for this. |
| return true; |
| } |
| |
| /** |
| * The ACID compactor process does not change the writeIds of the compacted files (eg. |
| * delta_0001 and delta_0002 will be compacted to delta_0001_0002_v0123 where v0123 is |
| * the visibility txn id for the compaction itself). While this compaction is in |
| * progress, we need to make sure that we ignore all such files. This is where |
| * the TxnList is useful. We use the validTxnList to determine if the compaction |
| * process is committed or not. If its not, we ignore the files which |
| * are being compacted. |
| * |
| * @param visibilityTxnId TransactionID derived from the directory name on the |
| * filesystem. |
| * @return true if the given visibilityTxnId is valid. In case either the |
| * visibilityTxnId is -1 or validTxnList is null, we return true. False otherwise. |
| */ |
| private boolean isTxnValid(long visibilityTxnId) { |
| return validTxnList == null || |
| visibilityTxnId == -1 || validTxnList.isTxnValid(visibilityTxnId); |
| } |
| } |
| |
| @Immutable |
| private static final class ParsedBase { |
| final long writeId; |
| final long visibilityTxnId; |
| |
| ParsedBase(long writeId, long visibilityTxnId) { |
| this.writeId = writeId; |
| this.visibilityTxnId = visibilityTxnId; |
| } |
| } |
| |
| @VisibleForTesting |
| static ParsedBase parseBase(String relPath) { |
| Matcher baseMatcher = BASE_PATTERN.matcher(relPath); |
| if (baseMatcher.matches()) { |
| long writeId = Long.valueOf(baseMatcher.group("writeId")); |
| long visibilityTxnId = -1; |
| String visibilityTxnIdStr = baseMatcher.group("visibilityTxnId"); |
| if (visibilityTxnIdStr != null) { |
| visibilityTxnId = Long.valueOf(visibilityTxnIdStr); |
| } |
| return new ParsedBase(writeId, visibilityTxnId); |
| } |
| return new ParsedBase(SENTINEL_BASE_WRITE_ID, -1); |
| } |
| |
| @VisibleForTesting |
| static long getBaseWriteId(String relPath) { |
| return parseBase(relPath).writeId; |
| } |
| |
| @Immutable |
| private static final class ParsedDelta { |
| final long minWriteId; |
| final long maxWriteId; |
| /// Value -1 means there is no statement id. |
| final long statementId; |
| /// Value -1 means there is no visibility txn id. |
| final long visibilityTxnId; |
| |
| ParsedDelta(long minWriteId, long maxWriteId, long statementId, |
| long visibilityTxnId) { |
| this.minWriteId = minWriteId; |
| this.maxWriteId = maxWriteId; |
| this.statementId = statementId; |
| this.visibilityTxnId = visibilityTxnId; |
| } |
| |
| private boolean isCompactedDeltaFile() { |
| return visibilityTxnId != -1; |
| } |
| } |
| |
| private static ParsedDelta matcherToParsedDelta(Matcher deltaMatcher) { |
| if (!deltaMatcher.matches()) { |
| return null; |
| } |
| long minWriteId = Long.valueOf(deltaMatcher.group("minWriteId")); |
| long maxWriteId = Long.valueOf(deltaMatcher.group("maxWriteId")); |
| String statementIdStr = deltaMatcher.group("statementId"); |
| long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1; |
| String visibilityTxnIdStr = deltaMatcher.group("visibilityTxnId"); |
| long visibilityTxnId = visibilityTxnIdStr != null ? |
| Long.valueOf(visibilityTxnIdStr) : -1; |
| return new ParsedDelta(minWriteId, maxWriteId, statementId, visibilityTxnId); |
| } |
| |
| private static ParsedDelta parseDelta(String dirPath) { |
| return matcherToParsedDelta(DELTA_PATTERN.matcher(dirPath)); |
| } |
| |
| private static ParsedDelta parseDeleteDelta(String dirPath) { |
| return matcherToParsedDelta(DELETE_DELTA_PATTERN.matcher(dirPath)); |
| } |
| |
| private static String getFirstDirName(String relPath) { |
| int slashIdx = relPath.indexOf("/"); |
| if (slashIdx != -1) { |
| return relPath.substring(0, slashIdx); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns true if 'fd' refers to a delete delta file. |
| */ |
| public static boolean isDeleteDeltaFd(FileDescriptor fd) { |
| return fd.getRelativePath().startsWith("delete_delta_"); |
| } |
| |
| /** |
| * This method is similar to {@link AcidUtils#filterFilesForAcidState} with the |
| * difference that it expects input to be valid file descriptors from a loaded table. |
| * This means that file descriptors are already pre-vetted and are consistent with |
| * respect to some ValidWriteIdList and ValidTxnList. All this method does is to try to |
| * filter such file descriptors for a different ValidWriteIdList. |
| * |
| * @param fds Input list of File descriptors to be filtered in-place. |
| * @param validWriteIdList The ValidWriteIdList for which we filter the fds. |
| * @return The number of file descriptors which were filtered out. |
| * @throws CatalogException if any of the provided FileDescriptor could be included or |
| * excluded since it contains some writeIds which are invalid for the given |
| * ValidWriteIdList. |
| */ |
| public static int filterFdsForAcidState(List<FileDescriptor> fds, |
| ValidWriteIdList validWriteIdList) throws CatalogException { |
| Preconditions.checkNotNull(fds); |
| |
| if (validWriteIdList == null) return 0; |
| |
| WriteListBasedPredicate writeListBasedPredicate = new WriteListBasedPredicate( |
| validWriteIdList, true); |
| Iterator<FileDescriptor> it = fds.iterator(); |
| int numRemoved = 0; |
| while (it.hasNext()) { |
| if (!writeListBasedPredicate.check(it.next().getRelativePath())) { |
| it.remove(); |
| numRemoved++; |
| } |
| } |
| return numRemoved; |
| } |
| |
| /** |
| * Filters the files based on Acid state. |
| * @param stats the FileStatuses obtained from recursively listing the directory |
| * @param baseDir the base directory for the partition (or table, in the case of |
| * unpartitioned tables) |
| * @param writeIds the valid write IDs for the table |
| * @param loadStats stats to add counts of skipped files to. May be null. |
| * @return the FileStatuses that is a subset of passed in descriptors that |
| * must be used. |
| * @throws CatalogException on ACID error. TODO: Remove throws clause once IMPALA-9042 |
| * is resolved. |
| */ |
| public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats, |
| Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds, |
| @Nullable LoadStats loadStats) throws CatalogException { |
| // First filter out any paths that are not considered valid write IDs. |
| // At the same time, calculate the max valid base write ID and collect the names of |
| // the delta directories. |
| WriteListBasedPredicate pred = new WriteListBasedPredicate(validTxnList, writeIds); |
| long maxBaseWriteId = Long.MIN_VALUE; |
| Set<String> deltaDirNames = new HashSet<>(); |
| for (Iterator<FileStatus> it = stats.iterator(); it.hasNext();) { |
| FileStatus stat = it.next(); |
| String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir); |
| if (!pred.check(relPath)) { |
| it.remove(); |
| if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++; |
| continue; |
| } |
| maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId); |
| String dirName = getFirstDirName(relPath); |
| if (dirName != null && (dirName.startsWith("delta_") || |
| dirName.startsWith("delete_delta_"))) { |
| deltaDirNames.add(dirName); |
| } |
| } |
| // Get a list of all valid delta directories. |
| List<Pair<String, ParsedDelta>> deltas = |
| getValidDeltaDirsOrdered(deltaDirNames, maxBaseWriteId); |
| // Filter out delta directories superceded by major/minor compactions. |
| Set<String> filteredDeltaDirs = |
| getFilteredDeltaDirs(deltas, maxBaseWriteId, writeIds); |
| // Filter out any files that are superceded by the latest valid base or not located |
| // in 'filteredDeltaDirs'. |
| return filterFilesForAcidState(stats, baseDir, maxBaseWriteId, filteredDeltaDirs, |
| loadStats); |
| } |
| |
| private static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats, |
| Path baseDir, long maxBaseWriteId, Set<String> deltaDirs, |
| @Nullable LoadStats loadStats) throws CatalogException { |
| List<FileStatus> validStats = new ArrayList<>(stats); |
| for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext();) { |
| FileStatus stat = it.next(); |
| |
| if (stat.isDirectory()) { |
| it.remove(); |
| continue; |
| } |
| |
| String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir); |
| if (relPath.startsWith("delta_") || |
| relPath.startsWith("delete_delta_")) { |
| String dirName = getFirstDirName(relPath); |
| if (dirName != null && !deltaDirs.contains(dirName)) { |
| it.remove(); |
| if (loadStats != null) loadStats.filesSupersededByAcidState++; |
| } |
| if (relPath.endsWith("_flush_length")) { |
| throw new CatalogException("Found Hive Streaming side-file: " + |
| stat.getPath() + " It means that the contents of the directory are " + |
| "currently being written, therefore Impala is not able to read it. " + |
| "Please try to load the table again once Hive Streaming commits " + |
| "the transaction."); |
| } |
| continue; |
| } |
| long baseWriteId = getBaseWriteId(relPath); |
| if (baseWriteId != SENTINEL_BASE_WRITE_ID) { |
| if (baseWriteId < maxBaseWriteId) { |
| it.remove(); |
| if (loadStats != null) loadStats.filesSupersededByAcidState++; |
| } |
| continue; |
| } |
| |
| // Not in a base or a delta directory. In that case, it's probably a |
| // post-upgrade file. |
| // If there is no valid base: we should read the file (assuming that |
| // hive.mm.allow.originals == true) |
| // If there is a valid base: the file should be merged to the base by the |
| // compaction, so we can assume that the file is no longer valid and just |
| // waits to be deleted. |
| if (maxBaseWriteId != SENTINEL_BASE_WRITE_ID) it.remove(); |
| } |
| return validStats; |
| } |
| |
| private static List<Pair<String, ParsedDelta>> getValidDeltaDirsOrdered( |
| Set<String> deltaDirNames, long baseWriteId) |
| throws CatalogException { |
| List <Pair<String, ParsedDelta>> deltas = new ArrayList<>(); |
| for (Iterator<String> it = deltaDirNames.iterator(); it.hasNext();) { |
| String dirname = it.next(); |
| ParsedDelta parsedDelta = parseDelta(dirname); |
| if (parsedDelta == null) parsedDelta = parseDeleteDelta(dirname); |
| if (parsedDelta != null) { |
| if (parsedDelta.minWriteId <= baseWriteId) { |
| Preconditions.checkState(parsedDelta.maxWriteId <= baseWriteId); |
| it.remove(); |
| continue; |
| } |
| deltas.add(new Pair<String, ParsedDelta>(dirname, parsedDelta)); |
| continue; |
| } |
| } |
| |
| deltas.sort(new Comparator<Pair<String, ParsedDelta>>() { |
| // This compare method is based on Hive (d6ad73c3615) |
| // AcidUtils.ParsedDeltaLight.compareTo() |
| // One additon to it is to take the visbilityTxnId into consideration. Hence if |
| // there's delta_N_M and delta_N_M_v001234 then delta_N_M_v001234 must be ordered |
| // before. |
| @Override |
| public int compare(Pair<String, ParsedDelta> o1, Pair<String, ParsedDelta> o2) { |
| ParsedDelta pd1 = o1.second; |
| ParsedDelta pd2 = o2.second; |
| if (pd1.minWriteId != pd2.minWriteId) { |
| if (pd1.minWriteId < pd2.minWriteId) { |
| return -1; |
| } else { |
| return 1; |
| } |
| } else if (pd1.maxWriteId != pd2.maxWriteId) { |
| if (pd1.maxWriteId < pd2.maxWriteId) { |
| return 1; |
| } else { |
| return -1; |
| } |
| } else if (pd1.statementId != pd2.statementId) { |
| /** |
| * We want deltas after minor compaction (w/o statementId) to sort earlier so |
| * that getAcidState() considers compacted files (into larger ones) obsolete |
| * Before compaction, include deltas with all statementIds for a given writeId. |
| */ |
| if (pd1.statementId < pd2.statementId) { |
| return -1; |
| } else { |
| return 1; |
| } |
| } else if (pd1.visibilityTxnId != pd2.visibilityTxnId) { |
| // This is an alteration from Hive's algorithm. If everything is the same then |
| // the higher visibilityTxnId wins (since no visibiltyTxnId is -1). |
| // Currently this cannot happen since Hive doesn't minor compact standalone |
| // delta directories of streaming ingestion, i.e. the following cannot happen: |
| // delta_1_5 => delta_1_5_v01234 |
| // However, it'd make sense because streaming ingested ORC files doesn't use |
| // advanced features like dictionary encoding or statistics. Hence Hive might |
| // do that in the future and that'd make Impala seeing duplicate rows. |
| // So I'd be cautious here in case they forget to tell us. |
| if (pd1.visibilityTxnId < pd2.visibilityTxnId) { |
| return 1; |
| } else { |
| return -1; |
| } |
| } else { |
| return o1.first.compareTo(o2.first); |
| } |
| } |
| }); |
| return deltas; |
| } |
| |
| /** |
| * The algorithm is copied from Hive's (d6ad73c3615) |
| * org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState() |
| * One additon to it is to take the visbilityTxnId into consideration. Hence if |
| * there's delta_N_M_v001234 and delta_N_M then it ignores delta_N_M. |
| */ |
| private static Set<String> getFilteredDeltaDirs(List<Pair<String, ParsedDelta>> deltas, |
| long baseWriteId, ValidWriteIdList writeIds) { |
| long current = baseWriteId; |
| long lastStmtId = -1; |
| ParsedDelta prev = null; |
| Set<String> filteredDeltaDirs = new HashSet<>(); |
| for (Pair<String, ParsedDelta> pathDelta : deltas) { |
| ParsedDelta next = pathDelta.second; |
| if (next.maxWriteId > current) { |
| // are any of the new transactions ones that we care about? |
| if (writeIds.isWriteIdRangeValid(current + 1, next.maxWriteId) != |
| ValidWriteIdList.RangeResponse.NONE) { |
| filteredDeltaDirs.add(pathDelta.first); |
| current = next.maxWriteId; |
| lastStmtId = next.statementId; |
| prev = next; |
| } |
| } else if (next.maxWriteId == current && lastStmtId >= 0) { |
| // make sure to get all deltas within a single transaction; multi-statement txn |
| // generate multiple delta files with the same txnId range |
| // of course, if maxWriteId has already been minor compacted, all per statement |
| // deltas are obsolete |
| filteredDeltaDirs.add(pathDelta.first); |
| prev = next; |
| } else if (prev != null && next.maxWriteId == prev.maxWriteId && |
| next.minWriteId == prev.minWriteId && |
| next.statementId == prev.statementId && |
| // If visibilityTxnId differs, then 'pathDelta' is probably a streaming ingested |
| // delta directory and 'prev' is the compacted version of it. |
| next.visibilityTxnId == prev.visibilityTxnId) { |
| // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, |
| // except |
| // the path. This may happen when we have split update and we have two types of |
| // delta |
| // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. |
| |
| // Also note that any delete_deltas in between a given delta_x_y range would be |
| // made |
| // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. |
| // This is valid because minor compaction always compacts the normal deltas and |
| // the delete deltas for the same range. That is, if we had 3 directories, |
| // delta_30_30, delete_delta_40_40 and delta_50_50, then running minor compaction |
| // would produce delta_30_50 and delete_delta_30_50. |
| filteredDeltaDirs.add(pathDelta.first); |
| prev = next; |
| } |
| } |
| return filteredDeltaDirs; |
| } |
| |
| /** |
| * This method compares the writeIdList of the given table if it is loaded and is a |
| * transactional table with the given ValidWriteIdList. If the tbl metadata is a |
| * superset of the metadata view represented by the given validWriteIdList this |
| * method returns a value greater than 0. If they are an exact match of each other, |
| * it returns 0 and if the table ValidWriteIdList is behind the provided |
| * validWriteIdList this return -1. This information useful to determine if the |
| * cached table can be used to construct a consistent snapshot corresponding to the |
| * given validWriteIdList. The ValidWriteIdList is compared only if the table id |
| * matches with the given tableId. |
| */ |
| public static int compare(HdfsTable tbl, ValidWriteIdList validWriteIdList, |
| long tableId) { |
| Preconditions.checkState(tbl != null && tbl.getMetaStoreTable() != null); |
| // if tbl is not a transactional, there is nothing to compare against and we return 0 |
| if (!isTransactionalTable(tbl.getMetaStoreTable().getParameters())) return 0; |
| Preconditions.checkNotNull(tbl.getValidWriteIds()); |
| // if the provided table id does not match with what CatalogService has we return |
| // -1 indicating that cached table is stale. |
| if (tableId != CatalogServiceCatalog.TABLE_ID_UNAVAILABLE |
| && tbl.getMetaStoreTable().getId() != tableId) { |
| return -1; |
| } |
| return compare(tbl.getValidWriteIds(), validWriteIdList); |
| } |
| |
| /*** This method is mostly copied from {@link org.apache.hive.common.util.TxnIdUtils} |
| * (e649562) with the exception that the table names for both the input writeIdList |
| * must be the same to have a valid comparison. |
| * //TODO source this directly from hive-exec so that future changes to this are |
| * automatically imported. |
| * |
| * @param a |
| * @param b |
| * @return 0, if a and b are equivalent |
| * 1, if a is more recent |
| * -1, if b is more recent |
| ***/ |
| @VisibleForTesting |
| public static int compare(ValidWriteIdList a, ValidWriteIdList b) { |
| Preconditions.checkState(a.getTableName().equalsIgnoreCase(b.getTableName())); |
| // The algorithm assumes invalidWriteIds are sorted and values are less or equal than |
| // hwm, here is how the algorithm works: |
| // 1. Compare two invalidWriteIds until one the list ends, difference means the |
| // mismatch writeid is committed in one ValidWriteIdList but not the other, the |
| // comparison end |
| // 2. Every writeid from the last writeid in the short invalidWriteIds till its |
| // hwm should be committed in the other ValidWriteIdList, otherwise the comparison |
| // end |
| // 3. Every writeid from lower hwm to higher hwm should be invalid, otherwise, the |
| // comparison end |
| int minLen = Math.min(a.getInvalidWriteIds().length, b.getInvalidWriteIds().length); |
| for (int i = 0; i < minLen; i++) { |
| if (a.getInvalidWriteIds()[i] == b.getInvalidWriteIds()[i]) { |
| continue; |
| } |
| return a.getInvalidWriteIds()[i] > b.getInvalidWriteIds()[i] ? 1 : -1; |
| } |
| if (a.getInvalidWriteIds().length == b.getInvalidWriteIds().length) { |
| return Long.signum(a.getHighWatermark() - b.getHighWatermark()); |
| } |
| if (a.getInvalidWriteIds().length == minLen) { |
| if (a.getHighWatermark() != b.getInvalidWriteIds()[minLen] - 1) { |
| return Long.signum(a.getHighWatermark() - (b.getInvalidWriteIds()[minLen] - 1)); |
| } |
| if (allInvalidFrom(b.getInvalidWriteIds(), minLen, b.getHighWatermark())) { |
| return 0; |
| } else { |
| return -1; |
| } |
| } else { |
| if (b.getHighWatermark() != a.getInvalidWriteIds()[minLen] - 1) { |
| return Long.signum(b.getHighWatermark() - (a.getInvalidWriteIds()[minLen] - 1)); |
| } |
| if (allInvalidFrom(a.getInvalidWriteIds(), minLen, a.getHighWatermark())) { |
| return 0; |
| } else { |
| return 1; |
| } |
| } |
| } |
| private static boolean allInvalidFrom(long[] invalidIds, int start, long hwm) { |
| for (int i=start+1;i<invalidIds.length;i++) { |
| if (invalidIds[i] != (invalidIds[i-1]+1)) { |
| return false; |
| } |
| } |
| return invalidIds[invalidIds.length-1] == hwm; |
| } |
| } |