blob: 3b856ad0fc9395c75dbe2a942cd84a88ac0c5632 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.Immutable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FileMetadataLoader.LoadStats;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.thrift.TTransactionalType;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
/**
* Contains utility functions for working with Acid tables.
* <p>
* The code is mostly copy pasted from Hive. Ideally we should use the
* code directly from Hive.
* </p>
*/
public class AcidUtils {
// Constant also defined in TransactionalValidationListener
public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only";
// Constant also defined in hive_metastoreConstants
public static final String TABLE_IS_TRANSACTIONAL = "transactional";
public static final String TABLE_TRANSACTIONAL_PROPERTIES = "transactional_properties";
/**
* Transaction parameters needed for single table operations.
*/
public static class TblTransaction {
public long txnId;
public boolean ownsTxn;
public long writeId;
public String validWriteIds;
}
// Regex pattern for files in base directories. The pattern matches strings like
// "base_0000005/abc.txt",
// "base_0000005/0000/abc.txt",
// "base_0000003_v0003217/000000_0"
private static final Pattern BASE_PATTERN = Pattern.compile(
"base_" +
"(?<writeId>\\d+)" +
"(?:_v(?<visibilityTxnId>\\d+))?" +
"(?:/.*)?");
// Regex pattern for files in delta directories. The pattern matches strings like
// "delta_0000006_0000006/000000_0",
// "delta_0000009_0000009_0000/0000/def.txt"
private static final Pattern DELTA_PATTERN = Pattern.compile(
"delta_" +
"(?<minWriteId>\\d+)_" +
"(?<maxWriteId>\\d+)" +
"(?:_(?<optionalStatementId>\\d+))?" +
// Optional path suffix.
"(?:/.*)?");
@VisibleForTesting
static final long SENTINEL_BASE_WRITE_ID = Long.MIN_VALUE;
// The code is same as what exists in AcidUtils.java in hive-exec.
// Ideally we should move the AcidUtils code from hive-exec into
// hive-standalone-metastore or some other jar and use it here.
private static boolean isInsertOnlyTable(Map<String, String> props) {
Preconditions.checkNotNull(props);
if (!isTransactionalTable(props)) {
return false;
}
String transactionalProp = props.get(TABLE_TRANSACTIONAL_PROPERTIES);
return transactionalProp != null && INSERTONLY_TRANSACTIONAL_PROPERTY.
equalsIgnoreCase(transactionalProp);
}
public static boolean isTransactionalTable(Map<String, String> props) {
Preconditions.checkNotNull(props);
String tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL);
if (tableIsTransactional == null) {
tableIsTransactional = props.get(TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
public static boolean isFullAcidTable(Map<String, String> props) {
return isTransactionalTable(props) && !isInsertOnlyTable(props);
}
// Sets transaction related table properties for new tables based on manually
// set table properties and default transactional type.
public static void setTransactionalProperties(Map<String, String> props,
TTransactionalType defaultTransactionalType) {
Preconditions.checkNotNull(props);
if (props.get(TABLE_IS_TRANSACTIONAL) != null
|| props.get(TABLE_TRANSACTIONAL_PROPERTIES) != null) {
// Table properties are set manually, ignore default.
return;
}
switch (defaultTransactionalType) {
case NONE: break;
case INSERT_ONLY:
props.put(TABLE_IS_TRANSACTIONAL, "true");
props.put(TABLE_TRANSACTIONAL_PROPERTIES, INSERTONLY_TRANSACTIONAL_PROPERTY);
break;
}
}
/**
* Predicate that checks if the file or directory is relevant for a given WriteId list.
* <p>
* <b>Must be called only for ACID table.</b>
* Checks that the path conforms to ACID table dir structure, and includes only
* directories that correspond to valid committed transactions.
* </p>
*/
private static class WriteListBasedPredicate implements Predicate<String> {
private final ValidTxnList validTxnList;
private final ValidWriteIdList writeIdList;
WriteListBasedPredicate(ValidTxnList validTxnList, ValidWriteIdList writeIdList) {
this.validTxnList = Preconditions.checkNotNull(validTxnList);
this.writeIdList = Preconditions.checkNotNull(writeIdList);
}
public boolean test(String dirPath) {
ParsedBase parsedBase = parseBase(dirPath);
if (parsedBase.writeId != SENTINEL_BASE_WRITE_ID) {
return writeIdList.isValidBase(parsedBase.writeId) &&
isTxnValid(parsedBase.visibilityTxnId);
} else {
ParsedDelta pd = parseDelta(dirPath);
if (pd != null) {
ValidWriteIdList.RangeResponse rr =
writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId);
return rr.equals(ValidWriteIdList.RangeResponse.ALL);
}
}
// If it wasn't in a base or a delta directory, we should include it.
// This allows post-upgrade tables to be read.
// TODO(todd) add an e2e test for this.
return true;
}
private boolean isTxnValid(long visibilityTxnId) {
return visibilityTxnId == -1 || validTxnList.isTxnValid(visibilityTxnId);
}
}
@Immutable
private static final class ParsedBase {
final long writeId;
final long visibilityTxnId;
ParsedBase(long writeId, long visibilityTxnId) {
this.writeId = writeId;
this.visibilityTxnId = visibilityTxnId;
}
}
@VisibleForTesting
static ParsedBase parseBase(String relPath) {
Matcher baseMatcher = BASE_PATTERN.matcher(relPath);
if (baseMatcher.matches()) {
long writeId = Long.valueOf(baseMatcher.group("writeId"));
long visibilityTxnId = -1;
String visibilityTxnIdStr = baseMatcher.group("visibilityTxnId");
if (visibilityTxnIdStr != null) {
visibilityTxnId = Long.valueOf(visibilityTxnIdStr);
}
return new ParsedBase(writeId, visibilityTxnId);
}
return new ParsedBase(SENTINEL_BASE_WRITE_ID, -1);
}
@VisibleForTesting
static long getBaseWriteId(String relPath) {
return parseBase(relPath).writeId;
}
@Immutable
private static final class ParsedDelta {
final long minWriteId;
final long maxWriteId;
/**
* Negative value indicates there was no statement id.
*/
final long statementId;
ParsedDelta(long minWriteId, long maxWriteId, long statementId) {
this.minWriteId = minWriteId;
this.maxWriteId = maxWriteId;
this.statementId = statementId;
}
}
private static ParsedDelta parseDelta(String dirPath) {
Matcher deltaMatcher = DELTA_PATTERN.matcher(dirPath);
if (!deltaMatcher.matches()) {
return null;
}
long minWriteId = Long.valueOf(deltaMatcher.group("minWriteId"));
long maxWriteId = Long.valueOf(deltaMatcher.group("maxWriteId"));
String statementIdStr = deltaMatcher.group("optionalStatementId");
long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1;
return new ParsedDelta(minWriteId, maxWriteId, statementId);
}
/**
* Filters the files based on Acid state.
* @param stats the FileStatuses obtained from recursively listing the directory
* @param baseDir the base directory for the partition (or table, in the case of
* unpartitioned tables)
* @param writeIds the valid write IDs for the table
* @param loadStats stats to add counts of skipped files to. May be null.
* @return the FileStatuses that is a subset of passed in descriptors that
* must be used.
*/
public static List<FileStatus> filterFilesForAcidState(List<FileStatus> stats,
Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds,
@Nullable LoadStats loadStats) {
List<FileStatus> validStats = new ArrayList<>(stats);
// First filter out any paths that are not considered valid write IDs.
// At the same time, calculate the max valid base write ID.
Predicate<String> pred = new WriteListBasedPredicate(validTxnList, writeIds);
long maxBaseWriteId = Long.MIN_VALUE;
for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
FileStatus stat = it.next();
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
if (!pred.test(relPath)) {
it.remove();
if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++;
continue;
}
maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId);
}
// Filter out any files that are superceded by the latest valid base,
// as well as any directories.
for (Iterator<FileStatus> it = validStats.iterator(); it.hasNext(); ) {
FileStatus stat = it.next();
if (stat.isDirectory()) {
it.remove();
continue;
}
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
long baseWriteId = getBaseWriteId(relPath);
if (baseWriteId != SENTINEL_BASE_WRITE_ID) {
if (baseWriteId < maxBaseWriteId) {
it.remove();
if (loadStats != null) loadStats.filesSupercededByNewerBase++;
}
continue;
}
ParsedDelta parsedDelta = parseDelta(relPath);
if (parsedDelta != null) {
if (parsedDelta.minWriteId <= maxBaseWriteId) {
it.remove();
if (loadStats != null) loadStats.filesSupercededByNewerBase++;
}
continue;
}
// Not in a base or a delta directory. In that case, it's probably a post-upgrade
// file.
// If there is no valid base: we should read the file (assuming that
// hive.mm.allow.originals == true)
// If there is a valid base: the file should be merged to the base by the
// compaction, so we can assume that the file is no longer valid and just
// waits to be deleted.
if (maxBaseWriteId != SENTINEL_BASE_WRITE_ID) it.remove();
}
return validStats;
}
}