blob: 0cb4af9abd2b1e9e09aef97a4857e9f5051a239e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io;
import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
import static org.apache.hadoop.hive.ql.parse.CalcitePlanner.ASTSearcher;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.table.create.CreateTableDesc;
import org.apache.hadoop.hive.ql.ddl.view.create.CreateMaterializedViewDesc;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.Writer;
import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.common.util.Ref;
import org.apache.orc.FileFormatException;
import org.apache.orc.impl.OrcAcidUtils;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import javax.annotation.concurrent.Immutable;
import java.nio.charset.Charset;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Utilities that are shared by all of the ACID input and output formats. They
* are used by the compactor and cleaner and thus must be format agnostic.
*/
public class AcidUtils {
// This key will be put in the conf file when planning an acid operation
public static final String CONF_ACID_KEY = "hive.doing.acid";
public static final String BASE_PREFIX = "base_";
public static final String COMPACTOR_TABLE_PROPERTY = "compactiontable";
public static final PathFilter baseFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(BASE_PREFIX);
}
};
public static final String DELTA_PREFIX = "delta_";
public static final String DELETE_DELTA_PREFIX = "delete_delta_";
/**
* Acid Streaming Ingest writes multiple transactions to the same file. It also maintains a
* {@link org.apache.orc.impl.OrcAcidUtils#getSideFile(Path)} side file which stores the length of
* the primary file as of the last commit ({@link OrcRecordUpdater#flush()}). That is the 'logical length'.
* Once the primary is closed, the side file is deleted (logical length = actual length) but if
* the writer dies or the primary file is being read while its still being written to, anything
* past the logical length should be ignored.
*
* @see org.apache.orc.impl.OrcAcidUtils#DELTA_SIDE_FILE_SUFFIX
* @see org.apache.orc.impl.OrcAcidUtils#getLastFlushLength(FileSystem, Path)
* @see #getLogicalLength(FileSystem, FileStatus)
*/
public static final String DELTA_SIDE_FILE_SUFFIX = "_flush_length";
public static final PathFilter deltaFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(DELTA_PREFIX);
}
};
public static final PathFilter deleteEventDeltaDirFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(DELETE_DELTA_PREFIX);
}
};
public static final String BUCKET_PREFIX = "bucket_";
public static final PathFilter bucketFileFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().startsWith(BUCKET_PREFIX) &&
!path.getName().endsWith(DELTA_SIDE_FILE_SUFFIX);
}
};
public static final String BUCKET_DIGITS = "%05d";
public static final String LEGACY_FILE_BUCKET_DIGITS = "%06d";
public static final String DELTA_DIGITS = "%07d";
/**
* 10K statements per tx. Probably overkill ... since that many delta files
* would not be good for performance
*/
public static final String STATEMENT_DIGITS = "%04d";
/**
* This must be in sync with {@link #STATEMENT_DIGITS}
*/
public static final int MAX_STATEMENTS_PER_TXN = 10000;
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
public static final Pattern BUCKET_PATTERN = Pattern.compile("bucket_([0-9]+)(_[0-9]+)?$");
private static Cache<String, DirInfoValue> dirCache;
private static AtomicBoolean dirCacheInited = new AtomicBoolean();
/**
* A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
* (Unless via Load Data statement)
*/
public static final PathFilter originalBucketFilter = new PathFilter() {
@Override
public boolean accept(Path path) {
return ORIGINAL_PATTERN.matcher(path.getName()).matches() ||
ORIGINAL_PATTERN_COPY.matcher(path.getName()).matches();
}
};
private AcidUtils() {
// NOT USED
}
private static final Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
public static final Pattern ORIGINAL_PATTERN =
Pattern.compile("[0-9]+_[0-9]+");
/**
* @see org.apache.hadoop.hive.ql.exec.Utilities#COPY_KEYWORD
*/
public static final Pattern ORIGINAL_PATTERN_COPY =
Pattern.compile("[0-9]+_[0-9]+" + COPY_KEYWORD + "[0-9]+");
public static final PathFilter hiddenFileFilter = new PathFilter(){
@Override
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
public static final PathFilter acidHiddenFileFilter = new PathFilter() {
@Override
public boolean accept(Path p) {
String name = p.getName();
// Don't filter out MetaDataFile.METADATA_FILE
if (name.startsWith(MetaDataFile.METADATA_FILE)) {
return true;
}
// Don't filter out OrcAcidVersion.ACID_FORMAT
if (name.startsWith(OrcAcidVersion.ACID_FORMAT)) {
return true;
}
return !name.startsWith("_") && !name.startsWith(".");
}
};
public static final PathFilter acidTempDirFilter = new PathFilter() {
@Override
public boolean accept(Path dirPath) {
String dirPathStr = dirPath.toString();
// We don't want to filter out temp tables
if (dirPathStr.contains(SessionState.TMP_PREFIX)) {
return true;
}
if ((dirPathStr.contains("/.")) || (dirPathStr.contains("/_"))) {
return false;
} else {
return true;
}
}
};
public static final String VISIBILITY_PREFIX = "_v";
public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+");
private static final HadoopShims SHIMS = ShimLoader.getHadoopShims();
/**
* Create the bucket filename in Acid format
* @param subdir the subdirectory for the bucket.
* @param bucket the bucket number
* @return the filename
*/
public static Path createBucketFile(Path subdir, int bucket) {
return createBucketFile(subdir, bucket, null, true);
}
public static Path createBucketFile(Path subdir, int bucket, String attemptId) {
return createBucketFile(subdir, bucket, attemptId, true);
}
/**
* Create acid or original bucket name
* @param subdir the subdirectory for the bucket.
* @param bucket the bucket number
* @return the filename
*/
private static Path createBucketFile(Path subdir, int bucket, String attemptId, boolean isAcidSchema) {
if(isAcidSchema) {
String fileName = BUCKET_PREFIX + String.format(BUCKET_DIGITS, bucket);
if (attemptId != null) {
fileName = fileName + "_" + attemptId;
}
return new Path(subdir, fileName);
}
else {
return new Path(subdir,
String.format(BUCKET_DIGITS, bucket));
}
}
/**
* This is format of delta dir name prior to Hive 1.3.x
*/
public static String deltaSubdir(long min, long max) {
return DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
String.format(DELTA_DIGITS, max);
}
/**
* Each write statement in a transaction creates its own delta dir.
* @since 1.3.x
*/
public static String deltaSubdir(long min, long max, int statementId) {
return deltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
}
/**
* This is format of delete delta dir name prior to Hive 2.2.x
*/
@VisibleForTesting
public static String deleteDeltaSubdir(long min, long max) {
return DELETE_DELTA_PREFIX + String.format(DELTA_DIGITS, min) + "_" +
String.format(DELTA_DIGITS, max);
}
/**
* Each write statement in a transaction creates its own delete delta dir,
* when split-update acid operational property is turned on.
* @since 2.2.x
*/
@VisibleForTesting
public static String deleteDeltaSubdir(long min, long max, int statementId) {
return deleteDeltaSubdir(min, max) + "_" + String.format(STATEMENT_DIGITS, statementId);
}
public static String baseDir(long writeId) {
return BASE_PREFIX + String.format(DELTA_DIGITS, writeId);
}
/**
* Return a base or delta directory string
* according to the given "baseDirRequired".
*/
public static String baseOrDeltaSubdir(boolean baseDirRequired, long min, long max, int statementId) {
if (!baseDirRequired) {
return deltaSubdir(min, max, statementId);
} else {
return baseDir(min);
}
}
/**
* Return a base or delta directory path according to the given "options".
*/
public static Path baseOrDeltaSubdirPath(Path directory, AcidOutputFormat.Options options) {
String subdir;
if (options.isWritingBase()) {
subdir = BASE_PREFIX + String.format(DELTA_DIGITS,
options.getMaximumWriteId());
} else if(options.getStatementId() == -1) {
//when minor compaction runs, we collapse per statement delta files inside a single
//transaction so we no longer need a statementId in the file name
subdir = options.isWritingDeleteDelta() ?
deleteDeltaSubdir(options.getMinimumWriteId(),
options.getMaximumWriteId())
: deltaSubdir(options.getMinimumWriteId(),
options.getMaximumWriteId());
} else {
subdir = options.isWritingDeleteDelta() ?
deleteDeltaSubdir(options.getMinimumWriteId(),
options.getMaximumWriteId(),
options.getStatementId())
: deltaSubdir(options.getMinimumWriteId(),
options.getMaximumWriteId(),
options.getStatementId());
}
subdir = addVisibilitySuffix(subdir, options.getVisibilityTxnId());
return new Path(directory, subdir);
}
/**
* Create a filename for a bucket file.
* @param directory the partition directory
* @param options the options for writing the bucket
* @return the filename that should store the bucket
*/
public static Path createFilename(Path directory,
AcidOutputFormat.Options options) {
if (options.getOldStyle()) {
return new Path(directory, String.format(LEGACY_FILE_BUCKET_DIGITS,
options.getBucketId()) + "_0");
} else {
return createBucketFile(baseOrDeltaSubdirPath(directory, options), options.getBucketId(), options.getAttemptId());
}
}
/**
* Since Hive 4.0, compactor produces directories with {@link #VISIBILITY_PATTERN} suffix.
* _v0 is equivalent to no suffix, for backwards compatibility.
*/
public static String addVisibilitySuffix(String baseOrDeltaDir, long visibilityTxnId) {
if(visibilityTxnId == 0) {
return baseOrDeltaDir;
}
return baseOrDeltaDir + VISIBILITY_PREFIX
+ String.format(DELTA_DIGITS, visibilityTxnId);
}
/**
* Represents bucketId and copy_N suffix
*/
public static final class BucketMetaData {
private static final BucketMetaData INVALID = new BucketMetaData(-1, 0);
/**
* @param bucketFileName {@link #ORIGINAL_PATTERN} or {@link #ORIGINAL_PATTERN_COPY}
*/
public static BucketMetaData parse(String bucketFileName) {
if (ORIGINAL_PATTERN.matcher(bucketFileName).matches()) {
int bucketId = Integer
.parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_')));
return new BucketMetaData(bucketId, 0);
}
else if(ORIGINAL_PATTERN_COPY.matcher(bucketFileName).matches()) {
int copyNumber = Integer.parseInt(
bucketFileName.substring(bucketFileName.lastIndexOf('_') + 1));
int bucketId = Integer
.parseInt(bucketFileName.substring(0, bucketFileName.indexOf('_')));
return new BucketMetaData(bucketId, copyNumber);
}
else if (bucketFileName.startsWith(BUCKET_PREFIX)) {
return new BucketMetaData(Integer
.parseInt(bucketFileName.substring(bucketFileName.indexOf('_') + 1)), 0);
}
return INVALID;
}
public static BucketMetaData parse(Path bucketFile) {
return parse(bucketFile.getName());
}
/**
* -1 if non-standard file name
*/
public final int bucketId;
/**
* 0 means no copy_N suffix
*/
public final int copyNumber;
private BucketMetaData(int bucketId, int copyNumber) {
this.bucketId = bucketId;
this.copyNumber = copyNumber;
}
}
/**
* Determine if a table is used during query based compaction.
* @param tblProperties table properties
* @return true, if the tblProperties contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY}
*/
public static boolean isCompactionTable(Properties tblProperties) {
if (tblProperties != null && tblProperties.containsKey(COMPACTOR_TABLE_PROPERTY) && tblProperties
.getProperty(COMPACTOR_TABLE_PROPERTY).equalsIgnoreCase("true")) {
return true;
}
return false;
}
/**
* Determine if a table is used during query based compaction.
* @param parameters table properties map
* @return true, if the parameters contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY}
*/
public static boolean isCompactionTable(Map<String, String> parameters) {
return Boolean.valueOf(parameters.getOrDefault(COMPACTOR_TABLE_PROPERTY, "false"));
}
/**
* Get the bucket id from the file path
* @param bucketFile - bucket file path
* @return - bucket id
*/
public static int parseBucketId(Path bucketFile) {
String filename = bucketFile.getName();
if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
return Integer.parseInt(filename.substring(0, filename.indexOf('_')));
} else if (filename.startsWith(BUCKET_PREFIX)) {
Matcher matcher = BUCKET_PATTERN.matcher(filename);
if (matcher.matches()) {
String bucketId = matcher.group(1);
filename = filename.substring(0,matcher.end(1));
if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
Utilities.FILE_OP_LOGGER.debug("Parsing bucket ID = " + bucketId + " from file name '" + filename + "'");
}
return Integer.parseInt(bucketId);
}
}
return -1;
}
public static String parseAttemptId(Path bucketFile) {
String filename = bucketFile.getName();
Matcher matcher = BUCKET_PATTERN.matcher(filename);
String attemptId = null;
if (matcher.matches()) {
attemptId = matcher.group(2) != null ? matcher.group(2).substring(1) : null;
}
if (Utilities.FILE_OP_LOGGER.isDebugEnabled()) {
Utilities.FILE_OP_LOGGER.debug("Parsing attempt ID = " + attemptId + " from file name '" + bucketFile + "'");
}
return attemptId;
}
/**
* Parse a bucket filename back into the options that would have created
* the file.
* @param bucketFile the path to a bucket file
* @param conf the configuration
* @return the options used to create that filename
*/
public static AcidOutputFormat.Options
parseBaseOrDeltaBucketFilename(Path bucketFile,
Configuration conf) throws IOException {
AcidOutputFormat.Options result = new AcidOutputFormat.Options(conf);
String filename = bucketFile.getName();
int bucket = parseBucketId(bucketFile);
String attemptId = parseAttemptId(bucketFile);
if (ORIGINAL_PATTERN.matcher(filename).matches() || ORIGINAL_PATTERN_COPY.matcher(filename).matches()) {
long minWriteId = 0;
long maxWriteId = 0;
int statementId = -1;
if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), false);
minWriteId = parsedDelta.getMinWriteId();
maxWriteId = parsedDelta.getMaxWriteId();
statementId = parsedDelta.getStatementId();
}
result
.setOldStyle(true)
.minimumWriteId(minWriteId)
.maximumWriteId(maxWriteId)
.statementId(statementId)
.bucket(bucket)
.writingBase(!bucketFile.getParent().getName().startsWith(DELTA_PREFIX));
}
else if (filename.startsWith(BUCKET_PREFIX)) {
if (bucketFile.getParent().getName().startsWith(BASE_PREFIX)) {
result
.setOldStyle(false)
.minimumWriteId(0)
.maximumWriteId(ParsedBase.parseBase(bucketFile.getParent()).getWriteId())
.bucket(bucket)
.writingBase(true);
} else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), false);
result
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
.maximumWriteId(parsedDelta.maxWriteId)
.statementId(parsedDelta.statementId)
.bucket(bucket)
.attemptId(attemptId);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), false);
result
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
.maximumWriteId(parsedDelta.maxWriteId)
.statementId(parsedDelta.statementId)
.bucket(bucket);
}
} else {
result.setOldStyle(true).bucket(bucket).minimumWriteId(0)
.maximumWriteId(0);
}
return result;
}
public static final class DirectoryImpl implements Directory {
private final List<Path> abortedDirectories;
private final boolean isBaseInRawFormat;
private final List<HdfsFileStatusWithId> original;
private final List<Path> obsolete;
private final List<ParsedDelta> deltas;
private final Path base;
private List<HdfsFileStatusWithId> baseFiles;
public DirectoryImpl(List<Path> abortedDirectories,
boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
List<Path> obsolete, List<ParsedDelta> deltas, Path base) {
this.abortedDirectories = abortedDirectories == null ?
Collections.emptyList() : abortedDirectories;
this.isBaseInRawFormat = isBaseInRawFormat;
this.original = original == null ? Collections.emptyList() : original;
this.obsolete = obsolete == null ? Collections.emptyList() : obsolete;
this.deltas = deltas == null ? Collections.emptyList() : deltas;
this.base = base;
}
@Override
public Path getBaseDirectory() {
return base;
}
public List<HdfsFileStatusWithId> getBaseFiles() {
return baseFiles;
}
void setBaseFiles(List<HdfsFileStatusWithId> baseFiles) {
this.baseFiles = baseFiles;
}
@Override
public boolean isBaseInRawFormat() {
return isBaseInRawFormat;
}
@Override
public List<HdfsFileStatusWithId> getOriginalFiles() {
return original;
}
@Override
public List<ParsedDelta> getCurrentDirectories() {
return deltas;
}
@Override
public List<Path> getObsolete() {
return obsolete;
}
@Override
public List<Path> getAbortedDirectories() {
return abortedDirectories;
}
@Override
public String toString() {
return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: "
+ original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base;
}
}
//This is used for (full) Acid tables. InsertOnly use NOT_ACID
public enum Operation implements Serializable {
NOT_ACID, INSERT, UPDATE, DELETE;
}
/**
* Logically this should have been defined in Operation but that causes a dependency
* on metastore package from exec jar (from the cluster) which is not allowed.
* This method should only be called from client side where metastore.* classes are present.
* Not following this will not be caught by unit tests since they have all the jar loaded.
*/
public static DataOperationType toDataOperationType(Operation op) {
switch (op) {
case NOT_ACID:
return DataOperationType.UNSET;
case INSERT:
return DataOperationType.INSERT;
case UPDATE:
return DataOperationType.UPDATE;
case DELETE:
return DataOperationType.DELETE;
default:
throw new IllegalArgumentException("Unexpected Operation: " + op);
}
}
public enum AcidBaseFileType {
/**
* File w/o Acid meta columns. This this would be the case for files that were added to the table
* before it was converted to Acid but not yet major compacted. May also be the the result of
* Load Data statement on an acid table.
*/
ORIGINAL_BASE,
/**
* File that has Acid metadata columns embedded in it. Found in base_x/ or delta_x_y/.
*/
ACID_SCHEMA,
}
/**
* A simple wrapper class that stores the information about a base file and its type.
* Orc splits can be generated on three kinds of base files: an original file (non-acid converted
* files), a regular base file (created by major compaction) or an insert delta (which can be
* treated as a base when split-update is enabled for acid).
*/
public static class AcidBaseFileInfo {
final private HdfsFileStatusWithId fileId;
final private AcidBaseFileType acidBaseFileType;
public AcidBaseFileInfo(HdfsFileStatusWithId fileId, AcidBaseFileType acidBaseFileType) {
this.fileId = fileId;
this.acidBaseFileType = acidBaseFileType;
}
public boolean isOriginal() {
return this.acidBaseFileType == AcidBaseFileType.ORIGINAL_BASE;
}
public boolean isAcidSchema() {
return this.acidBaseFileType == AcidBaseFileType.ACID_SCHEMA;
}
public HdfsFileStatusWithId getHdfsFileStatusWithId() {
return this.fileId;
}
}
/**
* Current syntax for creating full acid transactional tables is any one of following 3 ways:
* create table T (a int, b int) stored as orc tblproperties('transactional'='true').
* create table T (a int, b int) stored as orc tblproperties('transactional'='true',
* 'transactional_properties'='default').
* create table T (a int, b int) stored as orc tblproperties('transactional'='true',
* 'transactional_properties'='split_update').
* These are all identical and create a table capable of insert/update/delete/merge operations
* with full ACID semantics at Snapshot Isolation. These tables require ORC input/output format.
*
* To create a 1/4 acid, aka Micro Managed table:
* create table T (a int, b int) stored as orc tblproperties('transactional'='true',
* 'transactional_properties'='insert_only').
* These tables only support insert operation (also with full ACID semantics at SI).
*
*/
public static class AcidOperationalProperties {
private int description = 0x00;
public static final int SPLIT_UPDATE_BIT = 0x01;
public static final String SPLIT_UPDATE_STRING = "split_update";
public static final int HASH_BASED_MERGE_BIT = 0x02;
public static final String HASH_BASED_MERGE_STRING = "hash_merge";
public static final int INSERT_ONLY_BIT = 0x04;
public static final String INSERT_ONLY_STRING = "insert_only";
public static final String DEFAULT_VALUE_STRING = TransactionalValidationListener.DEFAULT_TRANSACTIONAL_PROPERTY;
public static final String INSERTONLY_VALUE_STRING = TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY;
private AcidOperationalProperties() {
}
/**
* Returns an acidOperationalProperties object that represents default ACID behavior for tables
* that do no explicitly specify/override the default behavior.
* @return the acidOperationalProperties object.
*/
public static AcidOperationalProperties getDefault() {
AcidOperationalProperties obj = new AcidOperationalProperties();
obj.setSplitUpdate(true);
obj.setHashBasedMerge(false);
obj.setInsertOnly(false);
return obj;
}
/**
* Returns an acidOperationalProperties object for tables that uses ACID framework but only
* supports INSERT operation and does not require ORC or bucketing
* @return the acidOperationalProperties object
*/
public static AcidOperationalProperties getInsertOnly() {
AcidOperationalProperties obj = new AcidOperationalProperties();
obj.setInsertOnly(true);
return obj;
}
/**
* Returns an acidOperationalProperties object that is represented by an encoded string.
* @param propertiesStr an encoded string representing the acidOperationalProperties.
* @return the acidOperationalProperties object.
*/
public static AcidOperationalProperties parseString(String propertiesStr) {
if (propertiesStr == null) {
return AcidOperationalProperties.getDefault();
}
if (propertiesStr.equalsIgnoreCase(DEFAULT_VALUE_STRING)) {
return AcidOperationalProperties.getDefault();
}
if (propertiesStr.equalsIgnoreCase(INSERTONLY_VALUE_STRING)) {
return AcidOperationalProperties.getInsertOnly();
}
AcidOperationalProperties obj = new AcidOperationalProperties();
String[] options = propertiesStr.split("\\|");
for (String option : options) {
if (option.trim().length() == 0) continue; // ignore empty strings
switch (option) {
case SPLIT_UPDATE_STRING:
obj.setSplitUpdate(true);
break;
case HASH_BASED_MERGE_STRING:
obj.setHashBasedMerge(true);
break;
default:
throw new IllegalArgumentException(
"Unexpected value " + option + " for ACID operational properties!");
}
}
return obj;
}
/**
* Returns an acidOperationalProperties object that is represented by an encoded 32-bit integer.
* @param properties an encoded 32-bit representing the acidOperationalProperties.
* @return the acidOperationalProperties object.
*/
public static AcidOperationalProperties parseInt(int properties) {
AcidOperationalProperties obj = new AcidOperationalProperties();
if ((properties & SPLIT_UPDATE_BIT) > 0) {
obj.setSplitUpdate(true);
}
if ((properties & HASH_BASED_MERGE_BIT) > 0) {
obj.setHashBasedMerge(true);
}
if ((properties & INSERT_ONLY_BIT) > 0) {
obj.setInsertOnly(true);
}
return obj;
}
/**
* Sets the split update property for ACID operations based on the boolean argument.
* When split update is turned on, an update ACID event is interpreted as a combination of
* delete event followed by an update event.
* @param isSplitUpdate a boolean property that turns on split update when true.
* @return the acidOperationalProperties object.
*/
public AcidOperationalProperties setSplitUpdate(boolean isSplitUpdate) {
description = (isSplitUpdate
? (description | SPLIT_UPDATE_BIT) : (description & ~SPLIT_UPDATE_BIT));
return this;
}
/**
* Sets the hash-based merge property for ACID operations that combines delta files using
* GRACE hash join based approach, when turned on. (Currently unimplemented!)
* @param isHashBasedMerge a boolean property that turns on hash-based merge when true.
* @return the acidOperationalProperties object.
*/
public AcidOperationalProperties setHashBasedMerge(boolean isHashBasedMerge) {
description = (isHashBasedMerge
? (description | HASH_BASED_MERGE_BIT) : (description & ~HASH_BASED_MERGE_BIT));
return this;
}
public AcidOperationalProperties setInsertOnly(boolean isInsertOnly) {
description = (isInsertOnly
? (description | INSERT_ONLY_BIT) : (description & ~INSERT_ONLY_BIT));
return this;
}
public boolean isSplitUpdate() {
return (description & SPLIT_UPDATE_BIT) > 0;
}
public boolean isHashBasedMerge() {
return (description & HASH_BASED_MERGE_BIT) > 0;
}
public boolean isInsertOnly() {
return (description & INSERT_ONLY_BIT) > 0;
}
public int toInt() {
return description;
}
@Override
public String toString() {
StringBuilder str = new StringBuilder();
if (isSplitUpdate()) {
str.append("|" + SPLIT_UPDATE_STRING);
}
if (isHashBasedMerge()) {
str.append("|" + HASH_BASED_MERGE_STRING);
}
if (isInsertOnly()) {
str.append("|" + INSERT_ONLY_STRING);
}
return str.toString();
}
}
/**
* Interface used to provide ACID directory information.
*/
public interface Directory {
/**
* Get the base directory.
* @return the base directory to read
*/
Path getBaseDirectory();
List<HdfsFileStatusWithId> getBaseFiles();
boolean isBaseInRawFormat();
/**
* Get the list of original files. Not {@code null}. Must be sorted.
* @return the list of original files (eg. 000000_0)
*/
List<HdfsFileStatusWithId> getOriginalFiles();
/**
* Get the list of base and delta directories that are valid and not
* obsolete. Not {@code null}. List must be sorted in a specific way.
* See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)}
* for details.
* @return the minimal list of current directories
*/
List<ParsedDelta> getCurrentDirectories();
/**
* Get the list of obsolete directories. After filtering out bases and
* deltas that are not selected by the valid transaction/write ids list, return the
* list of original files, bases, and deltas that have been replaced by
* more up to date ones. Not {@code null}.
*/
List<Path> getObsolete();
/**
* Get the list of directories that has nothing but aborted transactions.
* @return the list of aborted directories
*/
List<Path> getAbortedDirectories();
}
/**
* Since version 3 but prior to version 4, format of a base is "base_X" where X is a writeId.
* If this base was produced by a compactor, X is the highest writeId that the compactor included.
* If this base is produced by Insert Overwrite stmt, X is a writeId of the transaction that
* executed the insert.
* Since Hive Version 4.0, the format of a base produced by a compactor is
* base_X_vY. X is like before, i.e. the highest writeId compactor included and Y is the
* visibilityTxnId of the transaction in which the compactor ran.
* (v(isibility) is a literal to help parsing).
*/
public static final class ParsedBase {
private final long writeId;
private final long visibilityTxnId;
private final Path baseDirPath;
ParsedBase(long writeId, Path baseDirPath) {
this(writeId, 0, baseDirPath);
}
ParsedBase(long writeId, long visibilityTxnId, Path baseDirPath) {
this.writeId = writeId;
this.visibilityTxnId = visibilityTxnId;
this.baseDirPath = baseDirPath;
}
public long getWriteId() {
return writeId;
}
public long getVisibilityTxnId() {
return visibilityTxnId;
}
public Path getBaseDirPath() {
return baseDirPath;
}
public static ParsedBase parseBase(Path path) {
String filename = path.getName();
if(!filename.startsWith(BASE_PREFIX)) {
throw new IllegalArgumentException(filename + " does not start with " + BASE_PREFIX);
}
int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
if(idxOfv < 0) {
return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
}
return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path);
}
}
/**
* In addition to {@link ParsedDeltaLight} this knows if the data is in raw format, i.e. doesn't
* have acid metadata columns embedded in the files. To determine this in some cases
* requires looking at the footer of the data file which can be expensive so if this info is
* not needed {@link ParsedDeltaLight} should be used.
*/
@Immutable
public static final class ParsedDelta extends ParsedDeltaLight {
private final boolean isRawFormat;
private final List<HdfsFileStatusWithId> files;
/**
* for pre 1.3.x delta files
*/
private ParsedDelta(long min, long max, Path path, boolean isDeleteDelta,
boolean isRawFormat, long visibilityTxnId, List<HdfsFileStatusWithId> files) {
this(min, max, path, -1, isDeleteDelta, isRawFormat, visibilityTxnId, files);
}
private ParsedDelta(long min, long max, Path path, int statementId,
boolean isDeleteDelta, boolean isRawFormat, long visibilityTxnId, List<HdfsFileStatusWithId> files) {
super(min, max, path, statementId, isDeleteDelta, visibilityTxnId);
this.isRawFormat = isRawFormat;
this.files = files;
}
/**
* Files w/o Acid meta columns embedded in the file. See {@link AcidBaseFileType#ORIGINAL_BASE}
*/
public boolean isRawFormat() {
return isRawFormat;
}
public List<HdfsFileStatusWithId> getFiles() {
return files;
}
}
/**
* This encapsulates info obtained form the file path.
* See also {@link ParsedDelta}.
*/
@Immutable
public static class ParsedDeltaLight implements Comparable<ParsedDeltaLight> {
final long minWriteId;
final long maxWriteId;
final Path path;
//-1 is for internal (getAcidState()) purposes and means the delta dir
//had no statement ID
final int statementId;
final boolean isDeleteDelta; // records whether delta dir is of type 'delete_delta_x_y...'
/**
* transaction Id of txn which created this delta. This dir should be considered
* invisible unless this txn is committed
*
* TODO: define TransactionallyVisible interface - add getVisibilityTxnId() etc and all comments
* use in {@link ParsedBase}, {@link ParsedDelta}, {@link AcidInputFormat.Options}, AcidInputFormat.DeltaMetaData etc
*/
final long visibilityTxnId;
public static ParsedDeltaLight parse(Path deltaDir) {
//passing isRawFormat=false is bogus. This is just to parse the file name.
ParsedDelta pd = parsedDelta(deltaDir, false);
return new ParsedDeltaLight(pd.getMinWriteId(), pd.getMaxWriteId(), deltaDir,
pd.getStatementId(), pd.isDeleteDelta(), pd.getVisibilityTxnId());
}
private ParsedDeltaLight(long min, long max, Path path, int statementId,
boolean isDeleteDelta, long visibilityTxnId) {
this.minWriteId = min;
this.maxWriteId = max;
this.path = path;
this.statementId = statementId;
this.isDeleteDelta = isDeleteDelta;
this.visibilityTxnId = visibilityTxnId;
}
public long getMinWriteId() {
return minWriteId;
}
public long getMaxWriteId() {
return maxWriteId;
}
public Path getPath() {
return path;
}
public boolean hasStatementId() {
return statementId >= 0;
}
public int getStatementId() {
return hasStatementId() ? statementId : 0;
}
public boolean isDeleteDelta() {
return isDeleteDelta;
}
public long getVisibilityTxnId() {
return visibilityTxnId;
}
/**
* Only un-compacted delta_x_y (x != y) (created by streaming ingest with batch size > 1)
* may contain a {@link OrcAcidUtils#getSideFile(Path)}.
* @return
*/
boolean mayContainSideFile() {
return !isDeleteDelta() && getMinWriteId() != getMaxWriteId() && getVisibilityTxnId() <= 0;
}
/**
* Compactions (Major/Minor) merge deltas/bases but delete of old files
* happens in a different process; thus it's possible to have bases/deltas with
* overlapping writeId boundaries. The sort order helps figure out the "best" set of files
* to use to get data.
* This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before delta_5_10 (and delta_11_20)
*/
@Override
public int compareTo(ParsedDeltaLight parsedDelta) {
if (minWriteId != parsedDelta.minWriteId) {
if (minWriteId < parsedDelta.minWriteId) {
return -1;
} else {
return 1;
}
} else if (maxWriteId != parsedDelta.maxWriteId) {
if (maxWriteId < parsedDelta.maxWriteId) {
return 1;
} else {
return -1;
}
}
else if(statementId != parsedDelta.statementId) {
/**
* We want deltas after minor compaction (w/o statementId) to sort
* earlier so that getAcidState() considers compacted files (into larger ones) obsolete
* Before compaction, include deltas with all statementIds for a given writeId
* in a {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory}
*/
if(statementId < parsedDelta.statementId) {
return -1;
}
else {
return 1;
}
}
else {
return path.compareTo(parsedDelta.path);
}
}
}
/**
* Convert a list of deltas to a list of delta directories.
* @param deltas the list of deltas out of a Directory object.
* @return a list of delta directory paths that need to be read
*/
public static Path[] getPaths(List<ParsedDelta> deltas) {
Path[] result = new Path[deltas.size()];
for(int i=0; i < result.length; ++i) {
result[i] = deltas.get(i).getPath();
}
return result;
}
/**
* Convert the list of deltas into an equivalent list of begin/end
* write id pairs. Assumes {@code deltas} is sorted.
* @param deltas
* @return the list of write ids to serialize
*/
public static List<AcidInputFormat.DeltaMetaData> serializeDeleteDeltas(List<ParsedDelta> deltas) {
List<AcidInputFormat.DeltaMetaData> result = new ArrayList<>(deltas.size());
AcidInputFormat.DeltaMetaData last = null;
for (ParsedDelta parsedDelta : deltas) {
assert parsedDelta.isDeleteDelta() : "expected delete_delta, got " + parsedDelta.getPath();
final Integer stmtId = parsedDelta.statementId >= 0 ? parsedDelta.statementId : null;
if ((last != null)
&& (last.getMinWriteId() == parsedDelta.getMinWriteId())
&& (last.getMaxWriteId() == parsedDelta.getMaxWriteId())) {
if (stmtId != null) {
last.getStmtIds().add(stmtId);
}
for (HadoopShims.HdfsFileStatusWithId fileStatus : parsedDelta.getFiles()) {
last.getDeltaFiles().add(new AcidInputFormat.DeltaFileMetaData(fileStatus, stmtId, parseBucketId(fileStatus.getFileStatus().getPath())));
}
} else {
List<Integer> stmtIds = new ArrayList<>();
if (stmtId != null) {
stmtIds.add(stmtId);
}
last = new AcidInputFormat.DeltaMetaData(parsedDelta.getMinWriteId(), parsedDelta.getMaxWriteId(),
stmtIds, parsedDelta.getVisibilityTxnId(), parsedDelta.getFiles().stream()
.map(fs -> new AcidInputFormat.DeltaFileMetaData(fs, stmtId, parseBucketId(fs.getFileStatus().getPath())))
.collect(Collectors.toList()));
result.add(last);
}
}
return result;
}
/**
* Convert the list of begin/end write id pairs to a list of delete delta
* directories. Note that there may be multiple delete_delta files for the exact same txn range starting
* with 2.2.x;
* see {@link org.apache.hadoop.hive.ql.io.AcidUtils#deltaSubdir(long, long, int)}
* @param root the root directory
* @param deleteDeltas list of begin/end write id pairs
* @return the list of delta paths
*/
public static Path[] deserializeDeleteDeltas(Path root, final List<AcidInputFormat.DeltaMetaData> deleteDeltas) {
List<Path> results = new ArrayList<>(deleteDeltas.size());
for (AcidInputFormat.DeltaMetaData dmd : deleteDeltas) {
results.addAll(dmd.getPaths(root).stream().map(Pair::getLeft).collect(Collectors.toList()));
}
return results.toArray(new Path[results.size()]);
}
/**
* This will look at a footer of one of the files in the delta to see if the
* file is in Acid format, i.e. has acid metadata columns. The assumption is
* that for any dir, either all files are acid or all are not.
*/
public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
String deltaDirName = deltaDir.getName();
if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs, null);
}
return parsedDelta(deltaDir, DELTA_PREFIX, fs, null); // default prefix is delta_prefix
}
public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot)
throws IOException {
String filename = deltaDir.getName();
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
if (filename.startsWith(deltaPrefix)) {
//small optimization - delete delta can't be in raw format
boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, dirSnapshot);
ParsedDelta p = parsedDelta(deltaDir, isRawFormat);
List<HdfsFileStatusWithId> files = null;
if (dirSnapshot != null) {
files = dirSnapshot.getFiles().stream()
.filter(fileStatus -> bucketFileFilter.accept(fileStatus.getPath()))
.map(HdfsFileStatusWithoutId::new)
.collect(Collectors.toList());
} else if (isDeleteDelta) {
// For delete deltas we need the files for AcidState
try {
files = SHIMS.listLocatedHdfsStatus(fs, deltaDir, bucketFileFilter);
} catch (UnsupportedOperationException uoe) {
files = Arrays.stream(fs.listStatus(deltaDir, bucketFileFilter))
.map(HdfsFileStatusWithoutId::new)
.collect(Collectors.toList());
}
}
return new ParsedDelta(p.getMinWriteId(), p.getMaxWriteId(), deltaDir, p.statementId, isDeleteDelta, p.isRawFormat(),
p.visibilityTxnId, files);
}
throw new IllegalArgumentException(deltaDir + " does not start with " + deltaPrefix);
}
public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat) {
return parsedDelta(deltaDir, isRawFormat, null);
}
/**
* This method just parses the file name. It relies on caller to figure if
* the file is in Acid format (i.e. has acid metadata columns) or not.
* {@link #parsedDelta(Path, FileSystem)}
*/
public static ParsedDelta parsedDelta(Path deltaDir, boolean isRawFormat, List<HdfsFileStatusWithId> files) {
String filename = deltaDir.getName();
int idxOfVis = filename.indexOf(VISIBILITY_PREFIX);
long visibilityTxnId = 0;//visibilityTxnId:0 is always visible
if(idxOfVis >= 0) {
visibilityTxnId = Long.parseLong(filename.substring(idxOfVis + VISIBILITY_PREFIX.length()));
filename = filename.substring(0, idxOfVis);
}
boolean isDeleteDelta = filename.startsWith(DELETE_DELTA_PREFIX);
//make sure it's null for delete delta no matter what was passed in - this
//doesn't apply to delete deltas
isRawFormat = isDeleteDelta ? false : isRawFormat;
String rest = filename.substring((isDeleteDelta ?
DELETE_DELTA_PREFIX : DELTA_PREFIX).length());
int split = rest.indexOf('_');
//split2 may be -1 if no statementId
int split2 = rest.indexOf('_', split + 1);
long min = Long.parseLong(rest.substring(0, split));
long max = split2 == -1 ?
Long.parseLong(rest.substring(split + 1)) :
Long.parseLong(rest.substring(split + 1, split2));
if(split2 == -1) {
return new ParsedDelta(min, max, null, isDeleteDelta, isRawFormat, visibilityTxnId, files);
}
int statementId = Integer.parseInt(rest.substring(split2 + 1));
return new ParsedDelta(min, max, null, statementId, isDeleteDelta, isRawFormat, visibilityTxnId, files);
}
/**
* Is the given directory in ACID format?
* @param directory the partition directory to check
* @param conf the query configuration
* @return true, if it is an ACID directory
* @throws IOException
*/
public static boolean isAcid(Path directory, Configuration conf) throws IOException {
return isAcid(null, directory, conf);
}
public static boolean isAcid(FileSystem fileSystem, Path directory,
Configuration conf) throws IOException {
FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem;
for(FileStatus file: fs.listStatus(directory)) {
String filename = file.getPath().getName();
if (filename.startsWith(BASE_PREFIX) ||
filename.startsWith(DELTA_PREFIX) ||
filename.startsWith(DELETE_DELTA_PREFIX)) {
if (file.isDir()) {
return true;
}
}
}
return false;
}
/** State class for getChildState; cannot modify 2 things in a method. */
private static class TxnBase {
private Path basePath;
private long writeId = 0;
private long oldestBaseWriteId = Long.MAX_VALUE;
private Path oldestBase = null;
}
/**
* Get the ACID state of the given directory. It finds the minimal set of
* base and diff directories. Note that because major compactions don't
* preserve the history, we can't use a base directory that includes a
* write id that we must exclude.
* @param fileSystem optional, it it is not provided, it will be derived from the candidateDirectory
* @param candidateDirectory the partition directory to analyze
* @param conf the configuration
* @param writeIdList the list of write ids that we are reading
* @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
* @param ignoreEmptyFiles Ignore files with 0 length
* @return the state of the directory
* @throws IOException on filesystem errors
*/
public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
return getAcidState(fileSystem, candidateDirectory, conf, writeIdList, useFileIds, ignoreEmptyFiles, null);
}
/**
* GetAcidState implementation which uses the provided dirSnapshot.
* Generates a new one if needed and the provided one is null.
* @param fileSystem optional, it it is not provided, it will be derived from the candidateDirectory
* @param candidateDirectory the partition directory to analyze
* @param conf the configuration
* @param writeIdList the list of write ids that we are reading
* @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
* @param ignoreEmptyFiles Ignore files with 0 length
* @param dirSnapshots The listed directory snapshot, if null new will be generated
* @return the state of the directory
* @throws IOException on filesystem errors
*/
private static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, Map<Path,
HdfsDirSnapshot> dirSnapshots) throws IOException {
ValidTxnList validTxnList = null;
String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
if(!Strings.isNullOrEmpty(s)) {
/*
* getAcidState() is sometimes called on non-transactional tables, e.g.
* OrcInputFileFormat.FileGenerator.callInternal(). e.g. orc_merge3.q In that case
* writeIdList is bogus - doesn't even have a table name.
* see https://issues.apache.org/jira/browse/HIVE-20856.
*
* For now, assert that ValidTxnList.VALID_TXNS_KEY is set only if this is really a read
* of a transactional table.
* see {@link #getChildState(FileStatus, HdfsFileStatusWithId, ValidWriteIdList, List, List, List, List, TxnBase, boolean, List, Map, FileSystem, ValidTxnList)}
*/
validTxnList = new ValidReadTxnList();
validTxnList.readFromString(s);
}
FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem;
// The following 'deltas' includes all kinds of delta files including insert & delete deltas.
final List<ParsedDelta> deltas = new ArrayList<>();
List<ParsedDelta> working = new ArrayList<>();
List<Path> originalDirectories = new ArrayList<>();
final List<Path> obsolete = new ArrayList<>();
final List<Path> abortedDirectories = new ArrayList<>();
TxnBase bestBase = new TxnBase();
final List<HdfsFileStatusWithId> original = new ArrayList<>();
List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
getChildState(child, writeIdList, working, originalDirectories, original, obsolete,
bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
}
} else {
if (dirSnapshots == null) {
dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
}
getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
}
// If we have a base, the original files are obsolete.
if (bestBase.basePath != null) {
// Add original files to obsolete list if any
for (HdfsFileStatusWithId fswid : original) {
obsolete.add(fswid.getFileStatus().getPath());
}
// Add original directories to obsolete list if any
obsolete.addAll(originalDirectories);
// remove the entries so we don't get confused later and think we should
// use them.
original.clear();
originalDirectories.clear();
} else {
// Okay, we're going to need these originals.
// Recurse through them and figure out what we really need.
// If we already have the original list, do nothing
// If childrenWithId != null, we would have already populated "original"
if (childrenWithId != null) {
for (Path origDir : originalDirectories) {
findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
}
}
}
Collections.sort(working);
//so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
//and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
//subject to list of 'exceptions' in 'writeIdList' (not show in above example).
long current = bestBase.writeId;
int lastStmtId = -1;
ParsedDelta prev = null;
for(ParsedDelta next: working) {
if (next.maxWriteId > current) {
// are any of the new transactions ones that we care about?
if (writeIdList.isWriteIdRangeValid(current+1, next.maxWriteId) !=
ValidWriteIdList.RangeResponse.NONE) {
deltas.add(next);
current = next.maxWriteId;
lastStmtId = next.statementId;
prev = next;
}
}
else if(next.maxWriteId == current && lastStmtId >= 0) {
//make sure to get all deltas within a single transaction; multi-statement txn
//generate multiple delta files with the same txnId range
//of course, if maxWriteId has already been minor compacted, all per statement deltas are obsolete
deltas.add(next);
prev = next;
}
else if (prev != null && next.maxWriteId == prev.maxWriteId
&& next.minWriteId == prev.minWriteId
&& next.statementId == prev.statementId) {
// The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, except
// the path. This may happen when we have split update and we have two types of delta
// directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.
// Also note that any delete_deltas in between a given delta_x_y range would be made
// obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
// This is valid because minor compaction always compacts the normal deltas and the delete
// deltas for the same range. That is, if we had 3 directories, delta_30_30,
// delete_delta_40_40 and delta_50_50, then running minor compaction would produce
// delta_30_50 and delete_delta_30_50.
deltas.add(next);
prev = next;
}
else {
obsolete.add(next.path);
}
}
if(bestBase.oldestBase != null && bestBase.basePath == null &&
isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) {
/*
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
* {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus
* cannot have any data for an open txn. We could check {@link deltas} has files to cover
* [1,n] w/o gaps but this would almost never happen...
*
* We only throw for base_x produced by Compactor since that base erases all history and
* cannot be used for a client that has a snapshot in which something inside this base is
* open. (Nor can we ignore this base of course) But base_x which is a result of IOW,
* contains all history so we treat it just like delta wrt visibility. Imagine, IOW which
* aborts. It creates a base_x, which can and should just be ignored.*/
long[] exceptions = writeIdList.getInvalidWriteIds();
String minOpenWriteId = exceptions != null && exceptions.length > 0 ?
Long.toString(exceptions[0]) : "x";
throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
Long.toString(writeIdList.getHighWatermark()),
minOpenWriteId, bestBase.oldestBase.toString()));
}
Path base = null;
boolean isBaseInRawFormat = false;
if (bestBase.basePath != null) {
base = bestBase.basePath;
isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null);
}
LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
/*
* If this sort order is changed and there are tables that have been converted to transactional
* and have had any update/delete/merge operations performed but not yet MAJOR compacted, it
* may result in data loss since it may change how
* {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns
* {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen).
*/
// this does "Path.uri.compareTo(that.uri)"
original.sort(Comparator.comparing(HdfsFileStatusWithId::getFileStatus));
return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base);
}
public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs, final Path path)
throws IOException {
Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
while (itr.hasNext()) {
FileStatus fStatus = itr.next();
Path fPath = fStatus.getPath();
if (acidHiddenFileFilter.accept(fPath)) {
if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
if (dirSnapshot == null) {
dirSnapshot = new HdfsDirSnapshotImpl(fPath);
dirToSnapshots.put(fPath, dirSnapshot);
}
} else {
Path parentDirPath = fPath.getParent();
if (acidTempDirFilter.accept(parentDirPath)) {
while (isChildOfDelta(parentDirPath, path)) {
// Some cases there are other directory layers between the delta and the datafiles
// (export-import mm table, insert with union all to mm table, skewed tables).
// But it does not matter for the AcidState, we just need the deltas and the data files
// So build the snapshot with the files inside the delta directory
parentDirPath = parentDirPath.getParent();
}
HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
if (dirSnapshot == null) {
dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath);
dirToSnapshots.put(parentDirPath, dirSnapshot);
}
// We're not filtering out the metadata file and acid format file,
// as they represent parts of a valid snapshot
// We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
dirSnapshot.addMetadataFile(fStatus);
} else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
dirSnapshot.addOrcAcidFormatFile(fStatus);
} else {
dirSnapshot.addFile(fStatus);
}
}
}
}
}
return dirToSnapshots;
}
private static boolean isChildOfDelta(Path childDir, Path rootPath) {
if (childDir.toUri().toString().length() <= rootPath.toUri().toString().length()) {
return false;
}
// We do not want to look outside the original directory
String fullName = childDir.toUri().toString().substring(rootPath.toUri().toString().length() + 1);
String dirName = childDir.getName();
return (fullName.startsWith(BASE_PREFIX) && !dirName.startsWith(BASE_PREFIX)) ||
(fullName.startsWith(DELTA_PREFIX) && !dirName.startsWith(DELTA_PREFIX)) ||
(fullName.startsWith(DELETE_DELTA_PREFIX) && !dirName.startsWith(DELETE_DELTA_PREFIX));
}
/**
* DFS dir listing.
* Captures a dir and the corresponding list of files it contains,
* with additional properties about the dir (like isBase etc)
*
*/
public interface HdfsDirSnapshot {
public Path getPath();
public void addOrcAcidFormatFile(FileStatus fStatus);
public FileStatus getOrcAcidFormatFile();
public void addMetadataFile(FileStatus fStatus);
public FileStatus getMetadataFile(FileStatus fStatus);
// Get the list of files if any within this directory
public List<FileStatus> getFiles();
public void addFile(FileStatus file);
// File id or null
public Long getFileId();
public Boolean isRawFormat();
public void setIsRawFormat(boolean isRawFormat);
public Boolean isBase();
public void setIsBase(boolean isBase);
Boolean isValidBase();
public void setIsValidBase(boolean isValidBase);
Boolean isCompactedBase();
public void setIsCompactedBase(boolean isCompactedBase);
boolean contains(Path path);
}
public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot {
private Path dirPath;
private FileStatus metadataFStatus = null;
private FileStatus orcAcidFormatFStatus = null;
private List<FileStatus> files = new ArrayList<FileStatus>();
private Long fileId = null;
private Boolean isRawFormat = null;
private Boolean isBase = null;
private Boolean isValidBase = null;
private Boolean isCompactedBase = null;
public HdfsDirSnapshotImpl(Path path, List<FileStatus> files) {
this.dirPath = path;
this.files = files;
}
public HdfsDirSnapshotImpl(Path path) {
this.dirPath = path;
}
@Override
public Path getPath() {
return dirPath;
}
@Override
public List<FileStatus> getFiles() {
return files;
}
@Override
public void addFile(FileStatus file) {
files.add(file);
}
@Override
public Long getFileId() {
return fileId;
}
@Override
public Boolean isRawFormat() {
return isRawFormat;
}
@Override
public void setIsRawFormat(boolean isRawFormat) {
this.isRawFormat = isRawFormat;
}
@Override
public Boolean isBase() {
return isBase;
}
@Override
public Boolean isValidBase() {
return isValidBase;
}
@Override
public Boolean isCompactedBase() {
return isCompactedBase;
}
@Override
public void setIsBase(boolean isBase) {
this.isBase = isBase;
}
@Override
public void setIsValidBase(boolean isValidBase) {
this.isValidBase = isValidBase;
}
@Override
public void setIsCompactedBase(boolean isCompactedBase) {
this.isCompactedBase = isCompactedBase;
}
@Override
public void addOrcAcidFormatFile(FileStatus fStatus) {
this.orcAcidFormatFStatus = fStatus;
}
@Override
public FileStatus getOrcAcidFormatFile() {
return orcAcidFormatFStatus;
}
@Override
public void addMetadataFile(FileStatus fStatus) {
this.metadataFStatus = fStatus;
}
@Override
public FileStatus getMetadataFile(FileStatus fStatus) {
return metadataFStatus;
}
@Override
public boolean contains(Path path) {
for (FileStatus fileStatus: getFiles()) {
if (fileStatus.getPath().equals(path)) {
return true;
}
}
return false;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Path: " + dirPath);
sb.append("; ");
sb.append("Files: { ");
for (FileStatus fstatus : files) {
sb.append(fstatus);
sb.append(", ");
}
sb.append(" }");
return sb.toString();
}
}
/**
* We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view)
* A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct
* snapshot for this reader.
* Note that such base is NOT obsolete. Obsolete files are those that are "covered" by other
* files within the snapshot.
* A base produced by Insert Overwrite is different. Logically it's a delta file but one that
* causes anything written previously to be ignored (hence the overwrite). In this case, base_x
* is visible if writeid:x is committed for current reader.
*/
private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, FileSystem fs,
HdfsDirSnapshot dirSnapshot) throws IOException {
boolean isValidBase;
if (dirSnapshot != null && dirSnapshot.isValidBase() != null) {
isValidBase = dirSnapshot.isValidBase();
} else {
if (parsedBase.getWriteId() == Long.MIN_VALUE) {
//such base is created by 1st compaction in case of non-acid to acid table conversion
//By definition there are no open txns with id < 1.
isValidBase = true;
} else if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList
.getMinOpenWriteId()) {
isValidBase = true;
} else if (isCompactedBase(parsedBase, fs, dirSnapshot)) {
isValidBase = writeIdList.isValidBase(parsedBase.getWriteId());
} else {
// if here, it's a result of IOW
isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId());
}
if (dirSnapshot != null) {
dirSnapshot.setIsValidBase(isValidBase);
}
}
return isValidBase;
}
/**
* Returns {@code true} if {@code parsedBase} was created by compaction.
* As of Hive 4.0 we can tell if a directory is a result of compaction based on the
* presence of {@link AcidUtils#VISIBILITY_PATTERN} suffix. Base directories written prior to
* that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first
* since that is the cheaper test.*/
private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs,
Map<Path, HdfsDirSnapshot> snapshotMap) throws IOException {
return isCompactedBase(parsedBase, fs, snapshotMap != null ? snapshotMap.get(parsedBase.getBaseDirPath()) : null);
}
private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs,
HdfsDirSnapshot snapshot) throws IOException {
return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, snapshot);
}
private static void getChildState(HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList,
List<ParsedDelta> working, List<Path> originalDirectories, List<HdfsFileStatusWithId> original,
List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted, FileSystem fs,
ValidTxnList validTxnList) throws IOException {
Path childPath = childWithId.getFileStatus().getPath();
String fn = childPath.getName();
if (!childWithId.getFileStatus().isDirectory()) {
if (!ignoreEmptyFiles || childWithId.getFileStatus().getLen() != 0) {
original.add(childWithId);
}
} else if (fn.startsWith(BASE_PREFIX)) {
processBaseDir(childPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, null);
} else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
processDeltaDir(childPath, writeIdList, working, aborted, fs, validTxnList, null);
} else {
// This is just the directory. We need to recurse and find the actual files. But don't
// do this until we have determined there is no base. This saves time. Plus,
// it is possible that the cleaner is running and removing these original files,
// in which case recursing through them could cause us to get an error.
originalDirectories.add(childPath);
}
}
private static void getChildState(Path candidateDirectory, Map<Path, HdfsDirSnapshot> dirSnapshots,
ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles,
List<Path> aborted, FileSystem fs, ValidTxnList validTxnList) throws IOException {
for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) {
Path dirPath = dirSnapshot.getPath();
String dirName = dirPath.getName();
// dirPath may contains the filesystem prefix
if (dirPath.toString().endsWith(candidateDirectory.toString())) {
// if the candidateDirectory is itself a delta directory, we need to add originals in that directory
// and return. This is the case when compaction thread calls getChildState.
for (FileStatus fileStatus : dirSnapshot.getFiles()) {
if (!ignoreEmptyFiles || fileStatus.getLen() != 0) {
original.add(createOriginalObj(null, fileStatus));
}
}
} else if (dirName.startsWith(BASE_PREFIX)) {
processBaseDir(dirPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, dirSnapshot);
} else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) {
processDeltaDir(dirPath, writeIdList, working, aborted, fs, validTxnList, dirSnapshot);
} else {
originalDirectories.add(dirPath);
for (FileStatus stat : dirSnapshot.getFiles()) {
if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
original.add(createOriginalObj(null, stat));
}
}
}
}
}
private static void processBaseDir(Path baseDir, ValidWriteIdList writeIdList, List<Path> obsolete, TxnBase bestBase,
List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
throws IOException {
ParsedBase parsedBase = ParsedBase.parseBase(baseDir);
if (!isDirUsable(baseDir, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
return;
}
final long writeId = parsedBase.getWriteId();
if (bestBase.oldestBaseWriteId > writeId) {
// keep track for error reporting
bestBase.oldestBase = baseDir;
bestBase.oldestBaseWriteId = writeId;
}
if (bestBase.basePath == null) {
if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
bestBase.basePath = baseDir;
bestBase.writeId = writeId;
}
} else if (bestBase.writeId < writeId) {
if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
obsolete.add(bestBase.basePath);
bestBase.basePath = baseDir;
bestBase.writeId = writeId;
}
} else {
obsolete.add(baseDir);
}
}
private static void processDeltaDir(Path deltadir, ValidWriteIdList writeIdList, List<ParsedDelta> working,
List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
throws IOException {
String dirName = deltadir.getName();
String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
ParsedDelta delta = parsedDelta(deltadir, deltaPrefix, fs, dirSnapshot);
if (!isDirUsable(deltadir, delta.getVisibilityTxnId(), aborted, validTxnList)) {
return;
}
if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
aborted.add(deltadir);
} else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId)
!= ValidWriteIdList.RangeResponse.NONE) {
working.add(delta);
}
}
/**
* checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
*/
private static boolean isDirUsable(Path child, long visibilityTxnId, List<Path> aborted, ValidTxnList validTxnList) {
if (validTxnList == null) {
throw new IllegalArgumentException("No ValidTxnList for " + child);
}
if (!validTxnList.isTxnValid(visibilityTxnId)) {
boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId);
if (isAborted) {
aborted.add(child);// so we can clean it up
}
LOG.debug("getChildState() ignoring(" + aborted + ") " + child);
return false;
}
return true;
}
public static HdfsFileStatusWithId createOriginalObj(HdfsFileStatusWithId childWithId, FileStatus child) {
return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
}
public static class HdfsFileStatusWithoutId implements HdfsFileStatusWithId {
private final FileStatus fs;
public HdfsFileStatusWithoutId(FileStatus fs) {
this.fs = fs;
}
@Override
public FileStatus getFileStatus() {
return fs;
}
@Override
public Long getFileId() {
return null;
}
}
/**
* Find the original files (non-ACID layout) recursively under the partition directory.
* @param fs the file system
* @param dir the directory to add
* @param original the list of original files
* @throws IOException
*/
public static void findOriginals(FileSystem fs, Path dir,
List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds,
boolean ignoreEmptyFiles, boolean recursive) throws IOException {
List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir);
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
if (child.getFileStatus().isDirectory()) {
if (recursive) {
findOriginals(fs, child.getFileStatus().getPath(), original, useFileIds,
ignoreEmptyFiles, true);
}
} else {
if(!ignoreEmptyFiles || child.getFileStatus().getLen() > 0) {
original.add(child);
}
}
}
} else {
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, dir, hiddenFileFilter);
for (FileStatus child : children) {
if (child.isDirectory()) {
if (recursive) {
findOriginals(fs, child.getPath(), original, useFileIds, ignoreEmptyFiles, true);
}
} else {
if(!ignoreEmptyFiles || child.getLen() > 0) {
original.add(createOriginalObj(null, child));
}
}
}
}
}
private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds, FileSystem fs,
Path directory) {
if (useFileIds == null) {
return null;
}
List<HdfsFileStatusWithId> childrenWithId = null;
final Boolean val = useFileIds.value;
if (val == null || val) {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
if (val == null) {
useFileIds.value = true;
}
} catch (UnsupportedOperationException uoe) {
LOG.info("Failed to get files with ID; using regular API: " + uoe.getMessage());
if (val == null) {
useFileIds.value = false;
}
} catch (IOException ioe) {
LOG.info("Failed to get files with ID; using regular API: " + ioe.getMessage());
LOG.debug("Failed to get files with ID", ioe);
}
}
return childrenWithId;
}
public static boolean isTablePropertyTransactional(Properties props) {
String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return resultStr != null && resultStr.equalsIgnoreCase("true");
}
public static boolean isTablePropertyTransactional(Map<String, String> parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return resultStr != null && resultStr.equalsIgnoreCase("true");
}
public static boolean isTablePropertyTransactional(Configuration conf) {
String resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (resultStr == null) {
resultStr = conf.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return resultStr != null && resultStr.equalsIgnoreCase("true");
}
/**
* @param p - not null
*/
public static boolean isDeleteDelta(Path p) {
return p.getName().startsWith(DELETE_DELTA_PREFIX);
}
public static boolean isInsertDelta(Path p) {
return p.getName().startsWith(DELTA_PREFIX);
}
public static boolean isTransactionalTable(CreateTableDesc table) {
if (table == null || table.getTblProps() == null) {
return false;
}
return isTransactionalTable(table.getTblProps());
}
public static boolean isTransactionalTable(Map<String, String> props) {
String tableIsTransactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (tableIsTransactional == null) {
tableIsTransactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true");
}
public static boolean isTransactionalView(CreateMaterializedViewDesc view) {
if (view == null || view.getTblProps() == null) {
return false;
}
return isTransactionalTable(view.getTblProps());
}
/**
* Should produce the same result as
* {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#isAcidTable(org.apache.hadoop.hive.metastore.api.Table)}
*/
public static boolean isFullAcidTable(Table table) {
return isFullAcidTable(table == null ? null : table.getTTable());
}
public static boolean isTransactionalTable(Table table) {
return isTransactionalTable(table == null ? null : table.getTTable());
}
/**
* Should produce the same result as
* {@link org.apache.hadoop.hive.metastore.txn.TxnUtils#isAcidTable(org.apache.hadoop.hive.metastore.api.Table)}
*/
public static boolean isFullAcidTable(org.apache.hadoop.hive.metastore.api.Table table) {
return isTransactionalTable(table) &&
!isInsertOnlyTable(table.getParameters());
}
public static boolean isFullAcidTable(Map<String, String> params) {
return isTransactionalTable(params) && !isInsertOnlyTable(params);
}
public static boolean isTransactionalTable(org.apache.hadoop.hive.metastore.api.Table table) {
return table != null && table.getParameters() != null &&
isTablePropertyTransactional(table.getParameters());
}
public static boolean isFullAcidTable(CreateTableDesc td) {
if (td == null || td.getTblProps() == null) {
return false;
}
String tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
if (tableIsTransactional == null) {
tableIsTransactional = td.getTblProps().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase());
}
return tableIsTransactional != null && tableIsTransactional.equalsIgnoreCase("true") &&
!AcidUtils.isInsertOnlyTable(td.getTblProps());
}
public static boolean isFullAcidScan(Configuration conf) {
if (!HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN)) {
return false;
}
int propInt = conf.getInt(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, -1);
if (propInt == -1) {
return true;
}
AcidOperationalProperties props = AcidOperationalProperties.parseInt(propInt);
return !props.isInsertOnly();
}
/**
* Sets the acidOperationalProperties in the configuration object argument.
* @param conf Mutable configuration object
* @param properties An acidOperationalProperties object to initialize from. If this is null,
* we assume this is a full transactional table.
*/
public static void setAcidOperationalProperties(
Configuration conf, boolean isTxnTable, AcidOperationalProperties properties) {
if (isTxnTable) {
HiveConf.setBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, isTxnTable);
if (properties != null) {
HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES, properties.toInt());
}
} else {
conf.unset(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname);
conf.unset(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname);
}
}
/**
* Sets the acidOperationalProperties in the map object argument.
* @param parameters Mutable map object
* @param properties An acidOperationalProperties object to initialize from.
*/
public static void setAcidOperationalProperties(Map<String, String> parameters,
boolean isTxnTable, AcidOperationalProperties properties) {
parameters.put(ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN.varname, Boolean.toString(isTxnTable));
if (properties != null) {
parameters.put(ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES.varname, properties.toString());
}
}
/**
* Returns the acidOperationalProperties for a given table.
* @param table A table object
* @return the acidOperationalProperties object for the corresponding table.
*/
public static AcidOperationalProperties getAcidOperationalProperties(Table table) {
String transactionalProperties = table.getProperty(
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactionalProperties == null) {
// If the table does not define any transactional properties, we return a default type.
return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(transactionalProperties);
}
/**
* Returns the acidOperationalProperties for a given configuration.
* @param conf A configuration object
* @return the acidOperationalProperties object for the corresponding configuration.
*/
public static AcidOperationalProperties getAcidOperationalProperties(Configuration conf) {
// If the conf does not define any transactional properties, the parseInt() should receive
// a value of 1, which will set AcidOperationalProperties to a default type and return that.
return AcidOperationalProperties.parseInt(
HiveConf.getIntVar(conf, ConfVars.HIVE_TXN_OPERATIONAL_PROPERTIES));
}
/**
* Returns the acidOperationalProperties for a given set of properties.
* @param props A properties object
* @return the acidOperationalProperties object for the corresponding properties.
*/
public static AcidOperationalProperties getAcidOperationalProperties(Properties props) {
String resultStr = props.getProperty(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
// If the properties does not define any transactional properties, we return a default type.
return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
/**
* Returns the acidOperationalProperties for a given map.
* @param parameters A parameters object
* @return the acidOperationalProperties object for the corresponding map.
*/
public static AcidOperationalProperties getAcidOperationalProperties(
Map<String, String> parameters) {
String resultStr = parameters.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (resultStr == null) {
// If the parameters does not define any transactional properties, we return a default type.
return AcidOperationalProperties.getDefault();
}
return AcidOperationalProperties.parseString(resultStr);
}
/**
* See comments at {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX}.
*
* Returns the logical end of file for an acid data file.
*
* This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
* by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)}
* and so won't be read at all.
* @param file - data file to read/compute splits on
*/
public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_
if(AcidUtils.isInsertDelta(acidDir)) {
ParsedDeltaLight pd = ParsedDeltaLight.parse(acidDir);
if(!pd.mayContainSideFile()) {
return file.getLen();
}
}
else {
return file.getLen();
}
return getLastFlushLength(fs, file);
}
/**
* Read the side file to get the last flush length, or file length if no side file found.
* @param fs the file system to use
* @param deltaFile the delta file
* @return length as stored in the side file, or file length if no side file found
* @throws IOException if problems reading the side file
*/
private static long getLastFlushLength(FileSystem fs, FileStatus deltaFile) throws IOException {
Path sideFile = OrcAcidUtils.getSideFile(deltaFile.getPath());
try (FSDataInputStream stream = fs.open(sideFile)) {
long result = -1;
while (stream.available() > 0) {
result = stream.readLong();
}
if (result < 0) {
/* side file is there but we couldn't read it. We want to avoid a read where
* (file.getLen() < 'value from side file' which may happen if file is not closed)
* because this means some committed data from 'file' would be skipped. This should be very
* unusual.
*/
throw new IOException(sideFile + " found but is not readable. Consider waiting or "
+ "orcfiledump --recover");
}
return result;
} catch (FileNotFoundException e) {
return deltaFile.getLen();
}
}
/**
* Checks if a table is a transactional table that only supports INSERT, but not UPDATE/DELETE
* @param params table properties
* @return true if table is an INSERT_ONLY table, false otherwise
*/
public static boolean isInsertOnlyTable(Map<String, String> params) {
return isInsertOnlyTable(params, false);
}
public static boolean isInsertOnlyTable(Table table) {
return isTransactionalTable(table) && getAcidOperationalProperties(table).isInsertOnly();
}
// TODO [MM gap]: CTAS may currently be broken. It used to work. See the old code, and why isCtas isn't used?
public static boolean isInsertOnlyTable(Map<String, String> params, boolean isCtas) {
String transactionalProp = params.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
}
public static boolean isInsertOnlyTable(Properties params) {
String transactionalProp = params.getProperty(
hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return (transactionalProp != null && "insert_only".equalsIgnoreCase(transactionalProp));
}
/**
* The method for altering table props; may set the table to MM, non-MM, or not affect MM.
* todo: All such validation logic should be TransactionValidationListener
* @param tbl object image before alter table command (or null if not retrieved yet).
* @param props prop values set in this alter table command
*/
public static Boolean isToInsertOnlyTable(Table tbl, Map<String, String> props) {
// Note: Setting these separately is a very hairy issue in certain combinations, since we
// cannot decide what type of table this becomes without taking both into account, and
// in many cases the conversion might be illegal.
// The only thing we allow is tx = true w/o tx-props, for backward compat.
String transactional = props.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
String transactionalProp = props.get(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
if (transactional == null && transactionalProp == null) {
// Not affected or the op is not about transactional.
return null;
}
if (transactional == null && tbl != null) {
transactional = tbl.getParameters().get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
}
boolean isSetToTxn = "true".equalsIgnoreCase(transactional);
if (transactionalProp == null) {
if (isSetToTxn || tbl == null) return false; // Assume the full ACID table.
throw new RuntimeException("Cannot change '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL
+ "' without '" + hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "'");
}
if (!"insert_only".equalsIgnoreCase(transactionalProp)) return false; // Not MM.
if (!isSetToTxn) {
if (tbl == null) return true; // No table information yet; looks like it could be valid.
throw new RuntimeException("Cannot set '"
+ hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES + "' to 'insert_only' without "
+ "setting '" + hive_metastoreConstants.TABLE_IS_TRANSACTIONAL + "' to 'true'");
}
return true;
}
public static boolean isRemovedInsertOnlyTable(Set<String> removedSet) {
boolean hasTxn = removedSet.contains(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL),
hasProps = removedSet.contains(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
return hasTxn || hasProps;
}
/**
* Get the ValidTxnWriteIdList saved in the configuration.
*/
public static ValidTxnWriteIdList getValidTxnWriteIdList(Configuration conf) {
String txnString = conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY);
ValidTxnWriteIdList validTxnList = new ValidTxnWriteIdList(txnString);
return validTxnList;
}
/**
* Extract the ValidWriteIdList for the given table from the list of tables' ValidWriteIdList.
*/
public static ValidWriteIdList getTableValidWriteIdList(Configuration conf, String fullTableName) {
ValidTxnWriteIdList validTxnList = getValidTxnWriteIdList(conf);
return validTxnList.getTableValidWriteIdList(fullTableName);
}
/**
* Set the valid write id list for the current table scan.
*/
public static void setValidWriteIdList(Configuration conf, ValidWriteIdList validWriteIds) {
conf.set(ValidWriteIdList.VALID_WRITEIDS_KEY, validWriteIds.toString());
LOG.debug("Setting ValidWriteIdList: " + validWriteIds.toString()
+ " isAcidTable: " + HiveConf.getBoolVar(conf, ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, false)
+ " acidProperty: " + getAcidOperationalProperties(conf));
}
/**
* Set the valid write id list for the current table scan.
*/
public static void setValidWriteIdList(Configuration conf, TableScanDesc tsDesc) {
if (tsDesc.isTranscationalTable()) {
String dbName = tsDesc.getDatabaseName();
String tableName = tsDesc.getTableName();
ValidWriteIdList validWriteIdList = getTableValidWriteIdList(conf,
AcidUtils.getFullTableName(dbName, tableName));
if (validWriteIdList != null) {
setValidWriteIdList(conf, validWriteIdList);
} else {
// Log error if the acid table is missing from the ValidWriteIdList conf
LOG.error("setValidWriteIdList on table: " + AcidUtils.getFullTableName(dbName, tableName)
+ " isAcidTable: " + true
+ " acidProperty: " + getAcidOperationalProperties(conf)
+ " couldn't find the ValidWriteId list from ValidTxnWriteIdList: "
+ conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
throw new IllegalStateException("ACID table: " + AcidUtils.getFullTableName(dbName, tableName)
+ " is missing from the ValidWriteIdList config: "
+ conf.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
}
}
}
public static class TableSnapshot {
private long writeId;
private String validWriteIdList;
public TableSnapshot() {
}
public TableSnapshot(long writeId, String validWriteIdList) {
this.writeId = writeId;
this.validWriteIdList = validWriteIdList;
}
public String getValidWriteIdList() {
return validWriteIdList;
}
public long getWriteId() {
return writeId;
}
public void setWriteId(long writeId) {
this.writeId = writeId;
}
public void setValidWriteIdList(String validWriteIdList) {
this.validWriteIdList = validWriteIdList;
}
@Override
public String toString() {
return "[validWriteIdList=" + validWriteIdList + ", writeId=" + writeId + "]";
}
}
public static TableSnapshot getTableSnapshot(
Configuration conf,
Table tbl) throws LockException {
return getTableSnapshot(conf, tbl, false);
}
/** Note: this is generally called in Hive.java; so, the callers of Hive.java make sure
* to set up the acid state during compile, and Hive.java retrieves it if needed. */
public static TableSnapshot getTableSnapshot(
Configuration conf, Table tbl, boolean isStatsUpdater) throws LockException {
return getTableSnapshot(conf, tbl, tbl.getDbName(), tbl.getTableName(), isStatsUpdater);
}
/** Note: this is generally called in Hive.java; so, the callers of Hive.java make sure
* to set up the acid state during compile, and Hive.java retrieves it if needed. */
public static TableSnapshot getTableSnapshot(Configuration conf,
Table tbl, String dbName, String tblName, boolean isStatsUpdater)
throws LockException, AssertionError {
if (!isTransactionalTable(tbl)) {
return null;
}
if (dbName == null) {
dbName = tbl.getDbName();
}
if (tblName == null) {
tblName = tbl.getTableName();
}
long writeId = -1;
ValidWriteIdList validWriteIdList = null;
if (SessionState.get() != null) {
HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr();
String fullTableName = getFullTableName(dbName, tblName);
if (sessionTxnMgr != null && sessionTxnMgr.getCurrentTxnId() > 0) {
validWriteIdList = getTableValidWriteIdList(conf, fullTableName);
if (isStatsUpdater) {
writeId = sessionTxnMgr.getAllocatedTableWriteId(dbName, tblName);
if (writeId < 1) {
// TODO: this is not ideal... stats updater that doesn't have write ID is currently
// "create table"; writeId would be 0/-1 here. No need to call this w/true.
LOG.debug("Stats updater for {}.{} doesn't have a write ID ({})", dbName, tblName, writeId);
}
}
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) && conf.get(ValidTxnList.VALID_TXNS_KEY) == null) {
return null;
}
if (validWriteIdList == null) {
validWriteIdList = getTableValidWriteIdListWithTxnList(conf, dbName, tblName);
}
if (validWriteIdList == null) {
throw new AssertionError("Cannot find valid write ID list for " + tblName);
}
}
}
return new TableSnapshot(writeId,
validWriteIdList != null ? validWriteIdList.toString() : null);
}
/**
* Returns ValidWriteIdList for the table with the given "dbName" and "tableName".
* This is called when HiveConf has no list for the table.
* Otherwise use getTableSnapshot().
* @param conf Configuration
* @param dbName
* @param tableName
* @return ValidWriteIdList on success, null on failure to get a list.
* @throws LockException
*/
public static ValidWriteIdList getTableValidWriteIdListWithTxnList(
Configuration conf, String dbName, String tableName) throws LockException {
HiveTxnManager sessionTxnMgr = SessionState.get().getTxnMgr();
if (sessionTxnMgr == null) {
return null;
}
ValidWriteIdList validWriteIdList = null;
ValidTxnWriteIdList validTxnWriteIdList = null;
String validTxnList = conf.get(ValidTxnList.VALID_TXNS_KEY);
List<String> tablesInput = new ArrayList<>();
String fullTableName = getFullTableName(dbName, tableName);
tablesInput.add(fullTableName);
validTxnWriteIdList = sessionTxnMgr.getValidWriteIds(tablesInput, validTxnList);
return validTxnWriteIdList != null ?
validTxnWriteIdList.getTableValidWriteIdList(fullTableName) : null;
}
public static String getFullTableName(String dbName, String tableName) {
return TableName.fromString(tableName, null, dbName).getNotEmptyDbTable().toLowerCase();
}
/**
* General facility to place a metadta file into a dir created by acid/compactor write.
*
* Load Data commands against Acid tables write {@link AcidBaseFileType#ORIGINAL_BASE} type files
* into delta_x_x/ (or base_x in case there is Overwrite clause). {@link MetaDataFile} is a
* small JSON file in this directory that indicates that these files don't have Acid metadata
* columns and so the values for these columns need to be assigned at read time/compaction.
*/
public static class MetaDataFile {
//export command uses _metadata....
public static final String METADATA_FILE = "_metadata_acid";
private static final String CURRENT_VERSION = "0";
//todo: enums? that have both field name and value list
private interface Field {
String VERSION = "thisFileVersion";
String DATA_FORMAT = "dataFormat";
}
private interface Value {
//written by Major compaction
String COMPACTED = "compacted";
}
static boolean isCompacted(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException {
/**
* this file was written by Hive versions before 4.0 into a base_x/ dir
* created by compactor so that it can be distinguished from the one
* created by Insert Overwrite
*/
Path formatFile = new Path(baseOrDeltaDir, METADATA_FILE);
if (dirSnapshot != null && !dirSnapshot.contains(formatFile)) {
return false;
}
try (FSDataInputStream strm = fs.open(formatFile)) {
Map<String, String> metaData = new ObjectMapper().readValue(strm, Map.class);
if(!CURRENT_VERSION.equalsIgnoreCase(metaData.get(Field.VERSION))) {
throw new IllegalStateException("Unexpected Meta Data version: "
+ metaData.get(Field.VERSION));
}
String dataFormat = metaData.getOrDefault(Field.DATA_FORMAT, "null");
switch (dataFormat) {
case Value.COMPACTED:
return true;
default:
throw new IllegalArgumentException("Unexpected value for " + Field.DATA_FORMAT
+ ": " + dataFormat);
}
} catch (FileNotFoundException e) {
return false;
} catch(IOException e) {
String msg = "Failed to read " + baseOrDeltaDir + "/" + METADATA_FILE
+ ": " + e.getMessage();
LOG.error(msg, e);
throw e;
}
}
/**
* Chooses 1 representative file from {@code baseOrDeltaDir}
* This assumes that all files in the dir are of the same type: either written by an acid
* write or Load Data. This should always be the case for an Acid table.
*/
private static Path chooseFile(Path baseOrDeltaDir, FileSystem fs) throws IOException {
if(!(baseOrDeltaDir.getName().startsWith(BASE_PREFIX) ||
baseOrDeltaDir.getName().startsWith(DELTA_PREFIX))) {
throw new IllegalArgumentException(baseOrDeltaDir + " is not a base/delta");
}
FileStatus[] dataFiles;
try {
dataFiles = fs.listStatus(baseOrDeltaDir, originalBucketFilter);
} catch (FileNotFoundException e) {
// HIVE-22001: If the file was not found, this means that baseOrDeltaDir (which was listed
// earlier during AcidUtils.getAcidState()) was removed sometime between the FS list call
// and now. In the case of ACID tables the file would only have been removed by the transactional
// cleaner thread, in which case this is currently an old base/delta which has already been
// compacted. So a new set of base files from the compaction should exist which
// the current call to AcidUtils.getAcidState() would use rather than this old baes/delta.
// It should be ok to ignore this FileNotFound error and skip processing of this file - the list
// of files for this old base/delta will be incomplete, but it will not matter since this base/delta
// would be ignored (in favor of the new base files) by the selection logic in AcidUtils.getAcidState().
dataFiles = null;
}
return dataFiles != null && dataFiles.length > 0 ? dataFiles[0].getPath() : null;
}
/**
* Checks if the files in base/delta dir are a result of Load Data/Add Partition statement
* and thus do not have ROW_IDs embedded in the data.
* This is only meaningful for full CRUD tables - Insert-only tables have all their data
* in raw format by definition.
* @param baseOrDeltaDir base or delta file.
* @param dirSnapshot
*/
public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException {
//todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have
// files in raw format delta_x_y (x != y) whether from streaming ingested or compaction
// must be native Acid format by definition
if(isDeleteDelta(baseOrDeltaDir)) {
return false;
}
if(isInsertDelta(baseOrDeltaDir)) {
ParsedDeltaLight pd = ParsedDeltaLight.parse(baseOrDeltaDir);
if(pd.getMinWriteId() != pd.getMaxWriteId()) {
//must be either result of streaming or compaction
return false;
}
}
else {
//must be base_x
if(isCompactedBase(ParsedBase.parseBase(baseOrDeltaDir), fs, dirSnapshot)) {
return false;
}
}
//if here, have to check the files
Path dataFile = null;
if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) {
for (FileStatus fileStatus: dirSnapshot.getFiles()) {
if (originalBucketFilter.accept(fileStatus.getPath())) {
dataFile = fileStatus.getPath();
}
}
} else {
dataFile = chooseFile(baseOrDeltaDir, fs);
}
if (dataFile == null) {
//directory is empty or doesn't have any that could have been produced by load data
return false;
}
return isRawFormatFile(dataFile, fs);
}
public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException {
try {
Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf()));
/*
acid file would have schema like <op, owid, writerId, rowid, cwid, <f1, ... fn>> so could
check it this way once/if OrcRecordUpdater.ACID_KEY_INDEX_NAME is removed
TypeDescription schema = reader.getSchema();
List<String> columns = schema.getFieldNames();
*/
return OrcInputFormat.isOriginal(reader);
} catch (FileFormatException ex) {
//We may be parsing a delta for Insert-only table which may not even be an ORC file so
//cannot have ROW_IDs in it.
LOG.debug("isRawFormat() called on " + dataFile + " which is not an ORC file: " +
ex.getMessage());
return true;
}
}
}
/**
* Logic related to versioning acid data format. An {@code ACID_FORMAT} file is written to each
* base/delta/delete_delta dir written by a full acid write or compaction. This is the primary
* mechanism for versioning acid data.
*
* Each individual ORC file written stores the current version as a user property in ORC footer.
* All data files produced by Acid write should have this (starting with Hive 3.0), including
* those written by compactor. This is more for sanity checking in case someone moved the files
* around or something like that.
*
* Methods for getting/reading the version from files were moved to test class TestTxnCommands
* which is the only place they are used, in order to keep devs out of temptation, since they
* access the FileSystem which is expensive.
*/
public static final class OrcAcidVersion {
public static final String ACID_VERSION_KEY = "hive.acid.version";
public static final String ACID_FORMAT = "_orc_acid_version";
private static final Charset UTF8 = Charset.forName("UTF-8");
/**
* 2 is the version of Acid released in Hive 3.0.
*/
public static final int ORC_ACID_VERSION = 2;
/**
* Inlucde current acid version in file footer.
* @param writer - file written
*/
public static void setAcidVersionInDataFile(Writer writer) {
//so that we know which version wrote the file
writer.addUserMetadata(ACID_VERSION_KEY, UTF8.encode(String.valueOf(ORC_ACID_VERSION)));
}
/**
* This creates a version file in {@code deltaOrBaseDir}
* @param deltaOrBaseDir - where to create the version file
*/
public static void writeVersionFile(Path deltaOrBaseDir, FileSystem fs) throws IOException {
Path formatFile = getVersionFilePath(deltaOrBaseDir);
if(!fs.isFile(formatFile)) {
try (FSDataOutputStream strm = fs.create(formatFile, false)) {
strm.write(UTF8.encode(String.valueOf(ORC_ACID_VERSION)).array());
} catch (IOException ioe) {
LOG.error("Failed to create " + formatFile + " due to: " + ioe.getMessage(), ioe);
throw ioe;
}
}
}
public static Path getVersionFilePath(Path deltaOrBase) {
return new Path(deltaOrBase, ACID_FORMAT);
}
}
public static List<FileStatus> getAcidFilesForStats(
Table table, Path dir, Configuration jc, FileSystem fs) throws IOException {
List<FileStatus> fileList = new ArrayList<>();
ValidWriteIdList idList = AcidUtils.getTableValidWriteIdList(jc,
AcidUtils.getFullTableName(table.getDbName(), table.getTableName()));
if (idList == null) {
LOG.warn("Cannot get ACID state for " + table.getDbName() + "." + table.getTableName()
+ " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
return null;
}
if (fs == null) {
fs = dir.getFileSystem(jc);
}
// Collect the all of the files/dirs
Map<Path, HdfsDirSnapshot> hdfsDirSnapshots = AcidUtils.getHdfsDirSnapshots(fs, dir);
Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, hdfsDirSnapshots);
// Assume that for an MM table, or if there's only the base directory, we are good.
if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) {
Utilities.FILE_OP_LOGGER.warn(
"Computing stats for an ACID table; stats may be inaccurate");
}
for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
fileList.add(hfs.getFileStatus());
}
for (ParsedDelta delta : acidInfo.getCurrentDirectories()) {
fileList.addAll(hdfsDirSnapshots.get(delta.getPath()).getFiles());
}
if (acidInfo.getBaseDirectory() != null) {
fileList.addAll(hdfsDirSnapshots.get(acidInfo.getBaseDirectory()).getFiles());
}
return fileList;
}
public static List<Path> getValidDataPaths(Path dataPath, Configuration conf, String validWriteIdStr)
throws IOException {
List<Path> pathList = new ArrayList<>();
if ((validWriteIdStr == null) || validWriteIdStr.isEmpty()) {
// If Non-Acid case, then all files would be in the base data path. So, just return it.
pathList.add(dataPath);
return pathList;
}
// If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null,
false);
for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
pathList.add(hfs.getFileStatus().getPath());
}
for (ParsedDelta delta : acidInfo.getCurrentDirectories()) {
pathList.add(delta.getPath());
}
if (acidInfo.getBaseDirectory() != null) {
pathList.add(acidInfo.getBaseDirectory());
}
return pathList;
}
public static String getAcidSubDir(Path dataPath) {
String dataDir = dataPath.getName();
if (dataDir.startsWith(AcidUtils.BASE_PREFIX)
|| dataDir.startsWith(AcidUtils.DELTA_PREFIX)
|| dataDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
return dataDir;
}
return null;
}
//Get the first level acid directory (if any) from a given path
public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem) throws IOException {
if (dataPath == null) {
return null;
}
String firstLevelAcidDir = getAcidSubDir(dataPath);
if (firstLevelAcidDir != null) {
return firstLevelAcidDir;
}
String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem);
if (acidDirPath == null) {
return null;
}
// We need the path for directory so no need to append file name
if (fileSystem.isDirectory(dataPath)) {
return acidDirPath + Path.SEPARATOR + dataPath.getName();
}
return acidDirPath;
}
public static boolean isAcidEnabled(HiveConf hiveConf) {
String txnMgr = hiveConf.getVar(ConfVars.HIVE_TXN_MANAGER);
boolean concurrency = hiveConf.getBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY);
String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
if (txnMgr.equals(dbTxnMgr) && concurrency) {
return true;
}
return false;
}
public static class AnyIdDirFilter implements PathFilter {
@Override
public boolean accept(Path path) {
return extractWriteId(path) != null;
}
}
public static class IdPathFilter implements PathFilter {
private String baseDirName, deltaDirName;
private final boolean isDeltaPrefix;
private final Set<String> dpSpecs;
private final int dpLevel;
public IdPathFilter(long writeId, int stmtId) {
this(writeId, stmtId, null, 0);
}
public IdPathFilter(long writeId, int stmtId, Set<String> dpSpecs, int dpLevel) {
String deltaDirName = null;
deltaDirName = DELTA_PREFIX + String.format(DELTA_DIGITS, writeId) + "_" +
String.format(DELTA_DIGITS, writeId);
isDeltaPrefix = (stmtId < 0);
if (!isDeltaPrefix) {
deltaDirName += "_" + String.format(STATEMENT_DIGITS, stmtId);
}
this.baseDirName = BASE_PREFIX + String.format(DELTA_DIGITS, writeId);
this.deltaDirName = deltaDirName;
this.dpSpecs = dpSpecs;
this.dpLevel = dpLevel;
}
@Override
public boolean accept(Path path) {
String name = path.getName();
// Extending the path filter with optional dynamic partition specifications.
// This is needed for the use case when doing multi-statement insert overwrite with
// dynamic partitioning with direct insert or with insert-only tables.
// In this use-case, each FileSinkOperator should only clean-up the directories written
// by the same FileSinkOperator and do not clean-up the partition directories
// written by the other FileSinkOperators. (For further details please see HIVE-23114.)
if (dpLevel > 0 && dpSpecs != null && !dpSpecs.isEmpty()) {
Path parent = path.getParent();
String partitionSpec = parent.getName();
for (int i = 1; i < dpLevel; i++) {
parent = parent.getParent();
partitionSpec = parent.getName() + "/" + partitionSpec;
}
return (name.equals(baseDirName) && dpSpecs.contains(partitionSpec));
}
else {
return name.equals(baseDirName) || (isDeltaPrefix && name.startsWith(deltaDirName))
|| (!isDeltaPrefix && name.equals(deltaDirName));
}
}
}
public static Long extractWriteId(Path file) {
String fileName = file.getName();
if (!fileName.startsWith(DELTA_PREFIX) && !fileName.startsWith(BASE_PREFIX)) {
LOG.trace("Cannot extract write ID for a MM table: {}", file);
return null;
}
String[] parts = fileName.split("_", 4); // e.g. delta_0000001_0000001_0000 or base_0000022
if (parts.length < 2) {
LOG.debug("Cannot extract write ID for a MM table: " + file
+ " (" + Arrays.toString(parts) + ")");
return null;
}
long writeId = -1;
try {
writeId = Long.parseLong(parts[1]);
} catch (NumberFormatException ex) {
LOG.debug("Cannot extract write ID for a MM table: " + file
+ "; parsing " + parts[1] + " got " + ex.getMessage());
return null;
}
return writeId;
}
public static void setNonTransactional(Map<String, String> tblProps) {
tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "false");
tblProps.remove(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES);
}
private static boolean needsLock(Entity entity) {
switch (entity.getType()) {
case TABLE:
return isLockableTable(entity.getTable());
case PARTITION:
return isLockableTable(entity.getPartition().getTable());
default:
return true;
}
}
private static Table getTable(WriteEntity we) {
Table t = we.getTable();
if (t == null) {
throw new IllegalStateException("No table info for " + we);
}
return t;
}
private static boolean isLockableTable(Table t) {
if (t.isTemporary()) {
return false;
}
switch (t.getTableType()) {
case MANAGED_TABLE:
case MATERIALIZED_VIEW:
return true;
default:
return false;
}
}
/**
* Create lock components from write/read entities.
* @param outputs write entities
* @param inputs read entities
* @param conf
* @return list with lock components
*/
public static List<LockComponent> makeLockComponents(Set<WriteEntity> outputs, Set<ReadEntity> inputs,
Context.Operation operation, HiveConf conf) {
List<LockComponent> lockComponents = new ArrayList<>();
boolean skipReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_READ_LOCKS);
boolean skipNonAcidReadLock = !conf.getBoolVar(ConfVars.HIVE_TXN_NONACID_READ_LOCKS);
boolean sharedWrite = !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK);
boolean isMerge = operation == Context.Operation.MERGE;
// We don't want to acquire read locks during update or delete as we'll be acquiring write
// locks instead. Also, there's no need to lock temp tables since they're session wide
List<ReadEntity> readEntities = inputs.stream()
.filter(input -> !input.isDummy()
&& input.needsLock()
&& !input.isUpdateOrDelete()
&& AcidUtils.needsLock(input)
&& !skipReadLock)
.collect(Collectors.toList());
Set<Table> fullTableLock = getFullTableLock(readEntities, conf);
// For each source to read, get a shared_read lock
for (ReadEntity input : readEntities) {
LockComponentBuilder compBuilder = new LockComponentBuilder();
compBuilder.setSharedRead();
compBuilder.setOperationType(DataOperationType.SELECT);
Table t = null;
switch (input.getType()) {
case DATABASE:
compBuilder.setDbName(input.getDatabase().getName());
break;
case TABLE:
t = input.getTable();
if (!fullTableLock.contains(t)) {
continue;
}
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
case PARTITION:
case DUMMYPARTITION:
compBuilder.setPartitionName(input.getPartition().getName());
t = input.getPartition().getTable();
if (fullTableLock.contains(t)) {
continue;
}
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
default:
// This is a file or something we don't hold locks for.
continue;
}
if (skipNonAcidReadLock && !AcidUtils.isTransactionalTable(t)) {
// skip read-locks for non-transactional tables
// read-locks don't protect non-transactional tables data consistency
continue;
}
if (t != null) {
compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
}
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request {} ", comp);
lockComponents.add(comp);
}
// For each source to write to, get the appropriate lock type. If it's
// an OVERWRITE, we need to get an exclusive lock. If it's an insert (no
// overwrite) than we need a shared. If it's update or delete then we
// need a SHARED_WRITE.
for (WriteEntity output : outputs) {
LOG.debug("output is null " + (output == null));
if (output.getType() == Entity.Type.DFS_DIR || output.getType() == Entity.Type.LOCAL_DIR || !AcidUtils
.needsLock(output)) {
// We don't lock files or directories. We also skip locking temp tables.
continue;
}
LockComponentBuilder compBuilder = new LockComponentBuilder();
Table t = null;
/**
* For any insert/updates set dir cache to read-only mode, where it wouldn't
* add any new entry to cache.
* When updates are executed, delta folders are created only at the end of the statement
* and at the time of acquiring locks, there would not be any delta folders. This can cause wrong data to be reported
* when "insert" followed by "update" statements are executed. In such cases, use the cache as read only mode.
*/
HiveConf.setIntVar(conf, ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION, 0);
switch (output.getType()) {
case DATABASE:
compBuilder.setDbName(output.getDatabase().getName());
break;
case TABLE:
case DUMMYPARTITION: // in case of dynamic partitioning lock the table
t = output.getTable();
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
case PARTITION:
compBuilder.setPartitionName(output.getPartition().getName());
t = output.getPartition().getTable();
compBuilder.setDbName(t.getDbName());
compBuilder.setTableName(t.getTableName());
break;
default:
// This is a file or something we don't hold locks for.
continue;
}
switch (output.getWriteType()) {
/* base this on HiveOperation instead? this and DDL_NO_LOCK is peppered all over the code...
Seems much cleaner if each stmt is identified as a particular HiveOperation (which I'd think
makes sense everywhere). This however would be problematic for merge...*/
case DDL_EXCLUSIVE:
compBuilder.setExclusive();
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case INSERT_OVERWRITE:
t = AcidUtils.getTable(output);
if (AcidUtils.isTransactionalTable(t)) {
if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite) {
compBuilder.setExclusive();
} else {
compBuilder.setExclWrite();
}
compBuilder.setOperationType(DataOperationType.UPDATE);
} else {
compBuilder.setExclusive();
compBuilder.setOperationType(DataOperationType.NO_TXN);
}
break;
case INSERT:
assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
if (sharedWrite) {
if (conf.getBoolVar(ConfVars.TXN_MERGE_INSERT_X_LOCK) && isMerge) {
compBuilder.setExclWrite();
} else {
compBuilder.setSharedWrite();
}
} else {
if (conf.getBoolVar(ConfVars.TXN_MERGE_INSERT_X_LOCK) && isMerge) {
compBuilder.setExclusive();
} else {
compBuilder.setSharedRead();
}
}
} else if (MetaStoreUtils.isNonNativeTable(t.getTTable())) {
final HiveStorageHandler storageHandler = Preconditions.checkNotNull(t.getStorageHandler(),
"Thought all the non native tables have an instance of storage handler");
LockType lockType = storageHandler.getLockType(output);
if (null == LockType.findByValue(lockType.getValue())) {
throw new IllegalArgumentException(String
.format("Lock type [%s] for Database.Table [%s.%s] is unknown", lockType, t.getDbName(),
t.getTableName()));
}
compBuilder.setLock(lockType);
} else {
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_TXN_STRICT_LOCKING_MODE)) {
compBuilder.setExclusive();
} else { // this is backward compatible for non-ACID resources, w/o ACID semantics
compBuilder.setSharedRead();
}
}
compBuilder.setOperationType(DataOperationType.INSERT);
break;
case DDL_SHARED:
compBuilder.setSharedRead();
if (output.isTxnAnalyze()) {
// Analyze needs txn components to be present, otherwise an aborted analyze write ID
// might be rolled under the watermark by compactor while stats written by it are
// still present.
continue;
}
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case UPDATE:
case DELETE:
assert t != null;
if (AcidUtils.isTransactionalTable(t) && sharedWrite) {
compBuilder.setSharedWrite();
} else {
compBuilder.setExclWrite();
}
compBuilder.setOperationType(DataOperationType.valueOf(
output.getWriteType().name()));
break;
case DDL_NO_LOCK:
continue; // No lock required here
default:
throw new RuntimeException("Unknown write type " + output.getWriteType().toString());
}
if (t != null) {
compBuilder.setIsTransactional(AcidUtils.isTransactionalTable(t));
}
compBuilder.setIsDynamicPartitionWrite(output.isDynamicPartitionWrite());
LockComponent comp = compBuilder.build();
LOG.debug("Adding lock component to lock request " + comp.toString());
lockComponents.add(comp);
}
return lockComponents;
}
private static Set<Table> getFullTableLock(List<ReadEntity> readEntities, HiveConf conf) {
int partLocksThreshold = conf.getIntVar(HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD);
Map<Table, Long> partLocksPerTable = readEntities.stream()
.filter(input -> input.getType() == Entity.Type.PARTITION)
.map(Entity::getPartition)
.collect(Collectors.groupingBy(Partition::getTable, Collectors.counting()));
return readEntities.stream()
.filter(input -> input.getType() == Entity.Type.TABLE)
.map(Entity::getTable)
.filter(t -> !partLocksPerTable.containsKey(t)
|| (partLocksThreshold > 0 && partLocksThreshold <= partLocksPerTable.get(t)))
.collect(Collectors.toSet());
}
/**
* Safety check to make sure a file take from one acid table is not added into another acid table
* since the ROW__IDs embedded as part a write to one table won't make sense in different
* table/cluster.
*/
public static void validateAcidFiles(Table table, FileStatus[] srcs, FileSystem fs) throws SemanticException {
if (!AcidUtils.isFullAcidTable(table)) {
return;
}
validateAcidFiles(srcs, fs);
}
private static void validateAcidFiles(FileStatus[] srcs, FileSystem fs) throws SemanticException {
try {
if (srcs == null) {
return;
}
for (FileStatus oneSrc : srcs) {
if (!AcidUtils.MetaDataFile.isRawFormatFile(oneSrc.getPath(), fs)) {
throw new SemanticException(ErrorMsg.LOAD_DATA_ACID_FILE, oneSrc.getPath().toString());
}
}
} catch (IOException ex) {
throw new SemanticException(ex);
}
}
/**
* Safety check to make sure the given location is not the location of acid table and
* all it's files will be not added into another acid table
*/
public static void validateAcidPartitionLocation(String location, Configuration conf) throws SemanticException {
try {
URI uri = new URI(location);
FileSystem fs = FileSystem.get(uri, conf);
FileStatus[] fileStatuses = LoadSemanticAnalyzer.matchFilesOrDir(fs, new Path(uri));
validateAcidFiles(fileStatuses, fs);
} catch (IOException | URISyntaxException ex) {
throw new SemanticException(ErrorMsg.INVALID_PATH.getMsg(ex.getMessage()), ex);
}
}
/**
* Determines transaction type based on query AST.
* @param tree AST
*/
public static TxnType getTxnType(Configuration conf, ASTNode tree) {
final ASTSearcher astSearcher = new ASTSearcher();
// check if read-only txn
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_TXN_READONLY_ENABLED) &&
tree.getToken().getType() == HiveParser.TOK_QUERY &&
Stream.of(
new int[]{HiveParser.TOK_INSERT_INTO},
new int[]{HiveParser.TOK_INSERT, HiveParser.TOK_TAB})
.noneMatch(pattern -> astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) {
return TxnType.READ_ONLY;
}
// check if txn has a materialized view rebuild
if (tree.getToken().getType() == HiveParser.TOK_ALTER_MATERIALIZED_VIEW_REBUILD) {
return TxnType.MATER_VIEW_REBUILD;
}
// check if compaction request
if (tree.getFirstChildWithType(HiveParser.TOK_ALTERTABLE_COMPACT) != null){
return TxnType.COMPACTION;
}
return TxnType.DEFAULT;
}
public static List<HdfsFileStatusWithId> findBaseFiles(
Path base, Ref<Boolean> useFileIds, Supplier<FileSystem> fs) throws IOException {
Boolean val = useFileIds.value;
if (val == null || val) {
try {
List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
fs.get(), base, AcidUtils.hiddenFileFilter);
if (val == null) {
useFileIds.value = true; // The call succeeded, so presumably the API is there.
}
return result;
} catch (UnsupportedOperationException uoe) {
LOG.warn("Failed to get files with ID; using regular API: " + uoe.getMessage());
if (val == null) {
useFileIds.value = false;
}
} catch (IOException ioe) {
LOG.info("Failed to get files with ID; using regular API: " + ioe.getMessage());
LOG.debug("Failed to get files with ID", ioe);
}
}
// Fall back to regular API and create states without ID.
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(), base, AcidUtils.hiddenFileFilter);
List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
for (FileStatus child : children) {
result.add(AcidUtils.createOriginalObj(null, child));
}
return result;
}
private static void initDirCache(int durationInMts) {
if (dirCacheInited.get()) {
LOG.debug("DirCache got initialized already");
return;
}
dirCache = CacheBuilder.newBuilder()
.expireAfterWrite(durationInMts, TimeUnit.MINUTES)
.softValues()
.build();
dirCacheInited.set(true);
}
private static void printDirCacheEntries() {
if (dirCache != null) {
LOG.debug("Cache entries: {}", Arrays.toString(dirCache.asMap().keySet().toArray()));
}
}
/**
* Tries to get directory details from cache. For now, cache is valid only
* when base directory is available and no deltas are present. This should
* be used only in BI strategy and for ACID tables.
*
* @param fileSystem file system supplier
* @param candidateDirectory the partition directory to analyze
* @param conf the configuration
* @param writeIdList the list of write ids that we are reading
* @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
* @param ignoreEmptyFiles Ignore files with 0 length
* @return directory state
* @throws IOException on errors
*/
public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
Path candidateDirectory, Configuration conf,
ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
int dirCacheDuration = HiveConf.getIntVar(conf,
ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION);
if (dirCacheDuration < 0) {
LOG.debug("dirCache is not enabled");
return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList,
useFileIds, ignoreEmptyFiles);
} else {
initDirCache(dirCacheDuration);
}
/*
* Cache for single case, where base directory is there without deltas.
* In case of changes, cache would get invalidated based on
* open/aborted list
*/
//dbName + tableName + dir
String key = writeIdList.getTableName() + "_" + candidateDirectory.toString();
DirInfoValue value = dirCache.getIfPresent(key);
// in case of open/aborted txns, recompute dirInfo
long[] exceptions = writeIdList.getInvalidWriteIds();
boolean recompute = (exceptions != null && exceptions.length > 0);
if (recompute) {
LOG.info("invalidating cache entry for key: {}", key);
dirCache.invalidate(key);
value = null;
}
if (value != null) {
// double check writeIds
if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) {
if (LOG.isDebugEnabled()) {
LOG.debug("writeIdList: {} from cache: {} is not matching for key: {}",
writeIdList.writeToString(), value.getTxnString(), key);
}
recompute = true;
}
}
// compute and add to cache
if (recompute || (value == null)) {
Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
writeIdList, useFileIds, ignoreEmptyFiles);
value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
&& value.dirInfo.getCurrentDirectories().isEmpty()) {
if (dirCacheDuration > 0) {
populateBaseFiles(dirInfo, useFileIds, fileSystem);
dirCache.put(key, value);
} else {
LOG.info("Not populating cache for {}, as duration is set to 0", key);
}
}
} else {
LOG.info("Got {} from cache, cache size: {}", key, dirCache.size());
}
if (LOG.isDebugEnabled()) {
printDirCacheEntries();
}
return value.getDirInfo();
}
private static void populateBaseFiles(Directory dirInfo,
Ref<Boolean> useFileIds, Supplier<FileSystem> fileSystem) throws IOException {
if (dirInfo.getBaseDirectory() != null) {
// Cache base directory contents
List<HdfsFileStatusWithId> children = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fileSystem);
((DirectoryImpl) dirInfo).setBaseFiles(children);
}
}
static class DirInfoValue {
private String txnString;
private AcidUtils.Directory dirInfo;
DirInfoValue(String txnString, AcidUtils.Directory dirInfo) {
this.txnString = txnString;
this.dirInfo = dirInfo;
}
String getTxnString() {
return txnString;
}
AcidUtils.Directory getDirInfo() {
return dirInfo;
}
}
}