blob: 3b856ad0fc9395c75dbe2a942cd84a88ac0c5632 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TTransactionalType;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
* Contains utility functions for working with Acid tables.
* <p>
* The code is mostly copy pasted from Hive. Ideally we should use the
* code directly from Hive.
* </p>
public class AcidUtils {
// Constant also defined in TransactionalValidationListener
public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only";
// Constant also defined in hive_metastoreConstants
public static final String TABLE_IS_TRANSACTIONAL = "transactional";
public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
* Transaction parameters needed for single table operations.
public static class TblTransaction {
public long txnId;
public boolean ownsTxn;
public long writeId;
public String validWriteIds;
// Regex pattern for files in base directories. The pattern matches strings like
// "base_0000005/abc.txt",
// "base_0000005/0000/abc.txt",
// "base_0000003_v0003217/000000_0"
private static final Pattern BASE_PATTERN = Pattern.compile(
"base_" +
"(?<writeId>\\d+)" +
"(?:_v(?<visibilityTxnId>\\d+))?" +
// Regex pattern for files in delta directories. The pattern matches strings like
// "delta_0000006_0000006/000000_0",
// "delta_0000009_0000009_0000/0000/def.txt"
private static final Pattern DELTA_PATTERN = Pattern.compile(
"delta_" +
"(?<minWriteId>\\d+)_" +
"(?<maxWriteId>\\d+)" +
"(?:_(?<optionalStatementId>\\d+))?" +
// Optional path suffix.
static final long SENTINEL_BASE_WRITE_ID = Long.MIN_VALUE;
// The code is same as what exists in in hive-exec.
// Ideally we should move the AcidUtils code from hive-exec into
// hive-standalone-metastore or some other jar and use it here.
private static boolean isInsertOnlyTable(Map<String, String> props) {
if (!isTransactionalTable(props)) {
return false;
String transactionalProp = props.get(TABLE_TRANSACTIONAL_PROPERTIES);
return transactionalProp != null && INSERTONLY_TRANSACTIONAL_PROPERTY.
public static boolean isTransactionalTable(Map<String, String> props) {
String tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL);
if (tableIsTransactional == null) {
tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL.toUpperCase());
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
public static boolean isFullAcidTable(Map<String, String> props) {
return isTransactionalTable(props) && !isInsertOnlyTable(props);
// Sets transaction related table properties for new tables based on manually
// set table properties and default transactional type.
public static void setTransactionalProperties(Map<String, String> props,
TTransactionalType defaultTransactionalType) {
if (props.get(TABLE_IS_TRANSACTIONAL) != null
// Table properties are set manually, ignore default.
switch (defaultTransactionalType) {
case NONE: break;
props.put(TABLE_IS_TRANSACTIONAL, "true");
* Predicate that checks if the file or directory is relevant for a given WriteId list.
* <p>
* <b>Must be called only for ACID table.</b>
* Checks that the path conforms to ACID table dir structure, and includes only
* directories that correspond to valid committed transactions.
* </p>
private static class WriteListBasedPredicate implements Predicate<String> {
private final ValidTxnList validTxnList;
private final ValidWriteIdList writeIdList;
WriteListBasedPredicate(ValidTxnList validTxnList, ValidWriteIdList writeIdList) {
this.validTxnList = Preconditions.checkNotNull(validTxnList);
this.writeIdList = Preconditions.checkNotNull(writeIdList);
public boolean test(String dirPath) {
ParsedBase parsedBase = parseBase(dirPath);
if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) {
return writeIdList.isValidBase(parsedBase.writeId) &&
} else {
ParsedDelta pd = parseDelta(dirPath);
if (pd != null) {
ValidWriteIdList.RangeResponse rr =
writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId);
return rr.equals(ValidWriteIdList.RangeResponse.ALL);
// If it wasn't in a base or a delta directory, we should include it.
// This allows post-upgrade tables to be read.
// TODO(todd) add an e2e test for this.
return true;
private boolean isTxnValid(long visibilityTxnId) {
return visibilityTxnId == -1 || validTxnList.isTxnValid(visibilityTxnId);
private static final class ParsedBase {
final long writeId;
final long visibilityTxnId;
ParsedBase(long writeId, long visibilityTxnId) {
this.writeId = writeId;
this.visibilityTxnId = visibilityTxnId;
static ParsedBase parseBase(String relPath) {
Matcher baseMatcher = BASE_PATTERN.matcher(relPath);
if (baseMatcher.matches()) {
long writeId = Long.valueOf("writeId"));
long visibilityTxnId = -1;
String visibilityTxnIdStr ="visibilityTxnId");
if (visibilityTxnIdStr != null) {
visibilityTxnId = Long.valueOf(visibilityTxnIdStr);
return new ParsedBase(writeId, visibilityTxnId);
return new ParsedBase(SENTINEL_BASE_WRITE_ID, -1);
static long getBaseWriteId(String relPath) {
return parseBase(relPath).writeId;
private static final class ParsedDelta {
final long minWriteId;
final long maxWriteId;
* Negative value indicates there was no statement id.
final long statementId;
ParsedDelta(long minWriteId, long maxWriteId, long statementId) {
this.minWriteId = minWriteId;
this.maxWriteId = maxWriteId;
this.statementId = statementId;
private static ParsedDelta parseDelta(String dirPath) {
Matcher deltaMatcher = DELTA_PATTERN.matcher(dirPath);
if (!deltaMatcher.matches()) {
return null;
long minWriteId = Long.valueOf("minWriteId"));
long maxWriteId = Long.valueOf("maxWriteId"));
String statementIdStr ="optionalStatementId");
long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1;
return new ParsedDelta(minWriteId, maxWriteId, statementId);
* Filters the files based on Acid state.
* @param stats the FileStatuses obtained from recursively listing the directory
* @param baseDir the base directory for the partition (or table, in the case of
* unpartitioned tables)
* @param writeIds the valid write IDs for the table
* @param loadStats stats to add counts of skipped files to. May be null.
* @return the FileStatuses that is a subset of passed in descriptors that
* must be used.
public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds,
@Nullable LoadStats loadStats) {
List<FileStatus> validStats = new ArrayList<>(stats);
// First filter out any paths that are not considered valid write IDs.
// At the same time, calculate the max valid base write ID.
Predicate<String> pred = new WriteListBasedPredicate(validTxnList, writeIds);
long maxBaseWriteId = Long.MIN_VALUE;
for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
FileStatus stat =;
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
if (!pred.test(relPath)) {
if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++;
maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId);
// Filter out any files that are superceded by the latest valid base,
// as well as any directories.
for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
FileStatus stat =;
if (stat.isDirectory()) {
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
long baseWriteId = getBaseWriteId(relPath);
if (baseWriteId != SENTINEL_BASE_WRITE_ID) {
if (baseWriteId < maxBaseWriteId) {
if (loadStats != null) loadStats.filesSupercededByNewerBase++;
ParsedDelta parsedDelta = parseDelta(relPath);
if (parsedDelta != null) {
if (parsedDelta.minWriteId <= maxBaseWriteId) {
if (loadStats != null) loadStats.filesSupercededByNewerBase++;
// Not in a base or a delta directory. In that case, it's probably a post-upgrade
// file.
// If there is no valid base: we should read the file (assuming that
// == true)
// If there is a valid base: the file should be merged to the base by the
// compaction, so we can assume that the file is no longer valid and just
// waits to be deleted.
if (maxBaseWriteId != SENTINEL_BASE_WRITE_ID) it.remove();
return validStats;