package org.apache.hadoop.hive.upgrade.acid;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.impl.AcidStats;
import org.apache.orc.impl.OrcAcidUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.escapeSQLString;
* This utility is designed to help with upgrading to Hive 3.0. On-disk layout for transactional
* tables has changed in 3.0 and require pre-processing before upgrade to ensure they are readable
* by Hive 3.0. Some transactional tables (identified by this utility) require Major compaction
* to be run on them before upgrading to 3.0. Once this compaction starts, no more
* update/delete/merge statements may be executed on these tables until upgrade is finished.
* Additionally, a new type of transactional tables was added in 3.0 - insert-only tables. These
* tables support ACID semantics and work with any Input/OutputFormat. Any Managed tables may
* be made insert-only transactional table. These tables don't support Update/Delete/Merge commands.
* This utility works in 2 modes: preUpgrade and postUpgrade.
* In preUpgrade mode it has to have 2.x Hive jars on the classpath. It will perform analysis on
* existing transactional tables, determine which require compaction and generate a set of SQL
* commands to launch all of these compactions.
* Note that depending on the number of tables/partitions and amount of data in them compactions
* may take a significant amount of time and resources. The script output by this utility includes
* some heuristics that may help estimate the time required. If no script is produced, no action
* is needed. For compactions to run an instance of standalone Hive Metastore must be running.
* Please make sure hive.compactor.worker.threads is sufficiently high - this specifies the limit
* of concurrent compactions that may be run. Each compaction job is a Map-Reduce job.
* hive.compactor.job.queue may be used to set a Yarn queue ame where all compaction jobs will be
* submitted.
* In postUpgrade mode, Hive 3.0 jars/hive-site.xml should be on the classpath. This utility will
* find all the tables that may be made transactional (with ful CRUD support) and generate
* Alter Table commands to do so. It will also find all tables that may not support full CRUD
* but can be made insert-only transactional tables and generate corresponding Alter Table commands.
* TODO: rename files
* "execute" option may be supplied in both modes to have the utility automatically execute the
* equivalent of the generated commands
* "location" option may be supplied followed by a path to set the location for the generated
* scripts.
public class UpgradeTool {
private static final Logger LOG = LoggerFactory.getLogger(UpgradeTool.class);
private static final int PARTITION_BATCH_SIZE = 10000;
private final Options cmdLineOptions = new Options();
public static void main(String[] args) throws Exception {
UpgradeTool tool = new UpgradeTool();
CommandLineParser parser = new GnuParser();
CommandLine line ;
String outputDir = ".";
boolean preUpgrade = false, postUpgrade = false, execute = false, nonBlocking = false;
try {
line = parser.parse(tool.cmdLineOptions, args);
} catch (ParseException e) {
System.err.println("UpgradeTool: Parsing failed. Reason: " + e.getLocalizedMessage());
if (line.hasOption("help")) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
if(line.hasOption("location")) {
outputDir = line.getOptionValue("location");
if(line.hasOption("execute")) {
execute = true;
if(line.hasOption("preUpgrade")) {
preUpgrade = true;
if(line.hasOption("postUpgrade")) {
postUpgrade = true;
}"Starting with preUpgrade=" + preUpgrade + ", postUpgrade=" + postUpgrade +
", execute=" + execute + ", location=" + outputDir);
if(preUpgrade && postUpgrade) {
throw new IllegalArgumentException("Cannot specify both preUpgrade and postUpgrade");
try {
String hiveVer = HiveVersionInfo.getShortVersion();
if(preUpgrade) {
if(!hiveVer.startsWith("2.")) {
throw new IllegalStateException("preUpgrade requires Hive 2.x. Actual: " + hiveVer);
if(postUpgrade && execute && !isTestMode) {
if(!hiveVer.startsWith("3.")) {
throw new IllegalStateException("postUpgrade w/execute requires Hive 3.x. Actual: " +
tool.prepareAcidUpgradeInternal(outputDir, preUpgrade, postUpgrade, execute);
catch(Exception ex) {
LOG.error("UpgradeTool failed", ex);
throw ex;
private static void printAndExit(UpgradeTool tool) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("upgrade-acid", tool.cmdLineOptions);
private void init() {
try {
cmdLineOptions.addOption(new Option("help", "print this message"));
cmdLineOptions.addOption(new Option("preUpgrade",
"Generates a script to execute on 2.x cluster. This requires 2.x binaries" +
" on the classpath and hive-site.xml."));
cmdLineOptions.addOption(new Option("postUpgrade",
"Generates a script to execute on 3.x cluster. This requires 3.x binaries" +
" on the classpath and hive-site.xml."));
Option exec = new Option("execute",
"Executes commands equivalent to generated scrips");
cmdLineOptions.addOption(new Option("location", true,
"Location to write scripts to. Default is CWD."));
catch(Exception ex) {
LOG.error("init()", ex);
throw ex;
* todo: this should accept a file of table names to exclude from non-acid to acid conversion
* todo: change script comments to a preamble instead of a footer
* how does rename script work? "hadoop fs -mv oldname newname" * and what what about S3?
* How does this actually get executed?
* all other actions are done via embedded JDBC
private void prepareAcidUpgradeInternal(String scriptLocation, boolean preUpgrade,
boolean postUpgrade, boolean execute) throws HiveException, TException, IOException {
HiveConf conf = hiveConf != null ? hiveConf : new HiveConf();
boolean isAcidEnabled = isAcidEnabled(conf);
HiveMetaStoreClient hms = new HiveMetaStoreClient(conf);//MetaException
LOG.debug("Looking for databases");
List<String> databases = hms.getAllDatabases();//TException
LOG.debug("Found " + databases.size() + " databases to process");
List<String> compactions = new ArrayList<>();
List<String> convertToAcid = new ArrayList<>();
List<String> convertToMM = new ArrayList<>();
final CompactionMetaInfo compactionMetaInfo = new CompactionMetaInfo();
ValidTxnList txns = null;
Hive db = null;
if(execute) {
db = Hive.get(conf);
for(String dbName : databases) {
List<String> tables = hms.getAllTables(dbName);
LOG.debug("found " + tables.size() + " tables in " + dbName);
for(String tableName : tables) {
Table t = hms.getTable(dbName, tableName);
LOG.debug("processing table " + Warehouse.getQualifiedName(t));
if(preUpgrade && isAcidEnabled) {
//if acid is off, there can't be any acid tables - nothing to compact
if(execute && txns == null) {
This API changed from 2.x to 3.0. so this won't even compile with 3.0
but it doesn't need to since we only run this preUpgrade
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
txns = TxnUtils.createValidCompactTxnList(txnHandler.getOpenTxnsInfo());
List<String> compactionCommands =
getCompactionCommands(t, conf, hms, compactionMetaInfo, execute, db, txns);
if(postUpgrade && isAcidEnabled) {
//if acid is off post upgrade, you can't make any tables acid - will throw
processConversion(t, convertToAcid, convertToMM, hms, db, execute);
/*todo: handle renaming files somewhere*/
makeCompactionScript(compactions, scriptLocation, compactionMetaInfo);
makeConvertTableScript(convertToAcid, convertToMM, scriptLocation);
makeRenameFileScript(scriptLocation);//todo: is this pre or post upgrade?
//todo: can different tables be in different FileSystems?
if(preUpgrade && execute) {
while(compactionMetaInfo.compactionIds.size() > 0) {
LOG.debug("Will wait for " + compactionMetaInfo.compactionIds.size() +
" compactions to complete");
ShowCompactResponse resp = db.showCompactions();
for(ShowCompactResponseElement e : resp.getCompacts()) {
final String state = e.getState();
boolean removed;
switch (state) {
removed = compactionMetaInfo.compactionIds.remove(e.getId());
if(removed) {
LOG.debug("Required compaction succeeded: " + e.toString());
removed = compactionMetaInfo.compactionIds.remove(e.getId());
if(removed) {
LOG.warn("Required compaction failed: " + e.toString());
//may flood the log
//LOG.debug("Still waiting on: " + e.toString());
LOG.debug("Still working on: " + e.toString());
default://shouldn't be any others
LOG.error("Unexpected state for : " + e.toString());
if(compactionMetaInfo.compactionIds.size() > 0) {
try {
if (callback != null) {
} catch (InterruptedException ex) {
;//this only responds to ^C
* Actualy makes the table transactional
private static void alterTable(Table t, Hive db, boolean isMM)
throws HiveException, InvalidOperationException {
org.apache.hadoop.hive.ql.metadata.Table metaTable =
//clone to make sure new prop doesn't leak
new org.apache.hadoop.hive.ql.metadata.Table(t.deepCopy());
metaTable.getParameters().put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
if(isMM) {
.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES, "insert_only");
db.alterTable(Warehouse.getQualifiedName(t), metaTable, false, null);
* todo: handle exclusion list
* Figures out which tables to make Acid, MM and (optionally, performs the operation)
private static void processConversion(Table t, List<String> convertToAcid,
List<String> convertToMM, HiveMetaStoreClient hms, Hive db, boolean execute)
throws TException, HiveException {
if(isFullAcidTable(t)) {
if(! {
String fullTableName = Warehouse.getQualifiedName(t);
if(t.getPartitionKeysSize() <= 0) {
if(canBeMadeAcid(fullTableName, t.getSd())) {
convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
if(execute) {
alterTable(t, db, false);
else {
convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
"'transactional'='true', 'transactional_properties'='insert_only')");
if(execute) {
alterTable(t, db, true);
else {
each Partition may have different I/O Format so have to check them all before deciding to
make a full CRUD table.
Run in batches to prevent OOM
List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
int numWholeBatches = partNames.size()/batchSize;
for(int i = 0; i < numWholeBatches; i++) {
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(i * batchSize, (i + 1) * batchSize));
if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
if(numWholeBatches * batchSize < partNames.size()) {
//last partial batch
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(numWholeBatches * batchSize, partNames.size()));
if(alterTable(fullTableName, partitionList, convertToMM, t, db, execute)) {
//if here checked all parts and they are Acid compatible - make it acid
convertToAcid.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
if(execute) {
alterTable(t, db, false);
* @return true if table was converted/command generated
private static boolean alterTable(String fullTableName, List<Partition> partitionList,
List<String> convertToMM, Table t, Hive db, boolean execute)
throws InvalidOperationException, HiveException {
for(Partition p : partitionList) {
if(!canBeMadeAcid(fullTableName, p.getSd())) {
convertToMM.add("ALTER TABLE " + Warehouse.getQualifiedName(t) + " SET TBLPROPERTIES (" +
"'transactional'='true', 'transactional_properties'='insert_only')");
if(execute) {
alterTable(t, db, true);
return true;
return false;
private static boolean canBeMadeAcid(String fullTableName, StorageDescriptor sd) {
return isAcidInputOutputFormat(fullTableName, sd) && sd.getSortColsSize() <= 0;
private static boolean isAcidInputOutputFormat(String fullTableName, StorageDescriptor sd) {
try {
Class inputFormatClass = sd.getInputFormat() == null ? null :
Class outputFormatClass = sd.getOutputFormat() == null ? null :
if (inputFormatClass != null && outputFormatClass != null &&
.isAssignableFrom(inputFormatClass) &&
.isAssignableFrom(outputFormatClass)) {
return true;
} catch (ClassNotFoundException e) {
//if a table is using some custom I/O format and it's not in the classpath, we won't mark
//the table for Acid, but today (Hive 3.1 and earlier) OrcInput/OutputFormat is the only
//Acid format
LOG.error("Could not determine if " + fullTableName +
" can be made Acid due to: " + e.getMessage(), e);
return false;
return false;
* Generates a set compaction commands to run on pre Hive 3 cluster
private static void makeCompactionScript(List<String> commands, String scriptLocation,
CompactionMetaInfo compactionMetaInfo) throws IOException {
if (commands.isEmpty()) {"No compaction is necessary");
String fileName = "compacts_" + System.currentTimeMillis() + ".sql";
LOG.debug("Writing compaction commands to " + fileName);
try(PrintWriter pw = createScript(commands, fileName, scriptLocation)) {
//add post script
pw.println("-- Generated total of " + commands.size() + " compaction commands");
if(compactionMetaInfo.numberOfBytes < Math.pow(2, 20)) {
//to see it working in UTs
pw.println("-- The total volume of data to be compacted is " +
String.format("%.6fMB", compactionMetaInfo.numberOfBytes/Math.pow(2, 20)));
else {
pw.println("-- The total volume of data to be compacted is " +
String.format("%.3fGB", compactionMetaInfo.numberOfBytes/Math.pow(2, 30)));
//todo: should be at the top of the file...
"-- Please note that compaction may be a heavyweight and time consuming process.\n" +
"-- Submitting all of these commands will enqueue them to a scheduling queue from\n" +
"-- which they will be picked up by compactor Workers. The max number of\n" +
"-- concurrent Workers is controlled by hive.compactor.worker.threads configured\n" +
"-- for the standalone metastore process. Compaction itself is a Map-Reduce job\n" +
"-- which is submitted to the YARN queue identified by hive.compactor.job.queue\n" +
"-- property if defined or 'default' if not defined. It's advisable to set the\n" +
"-- capacity of this queue appropriately");
private static void makeConvertTableScript(List<String> alterTableAcid, List<String> alterTableMm,
String scriptLocation) throws IOException {
if (alterTableAcid.isEmpty()) {"No acid conversion is necessary");
} else {
String fileName = "convertToAcid_" + System.currentTimeMillis() + ".sql";
LOG.debug("Writing CRUD conversion commands to " + fileName);
try(PrintWriter pw = createScript(alterTableAcid, fileName, scriptLocation)) {
//todo: fix this - it has to run in 3.0 since tables may be unbucketed
pw.println("-- These commands may be executed by Hive 1.x later");
if (alterTableMm.isEmpty()) {"No managed table conversion is necessary");
} else {
String fileName = "convertToMM_" + System.currentTimeMillis() + ".sql";
LOG.debug("Writing managed table conversion commands to " + fileName);
try(PrintWriter pw = createScript(alterTableMm, fileName, scriptLocation)) {
pw.println("-- These commands must be executed by Hive 3.0 or later");
private static PrintWriter createScript(List<String> commands, String fileName,
String scriptLocation) throws IOException {
FileWriter fw = new FileWriter(scriptLocation + "/" + fileName);
PrintWriter pw = new PrintWriter(fw);
for(String cmd : commands) {
pw.println(cmd + ";");
return pw;
private static void makeRenameFileScript(String scriptLocation) throws IOException {
List<String> commands = Collections.emptyList();
if (commands.isEmpty()) {"No file renaming is necessary");
} else {
String fileName = "normalizeFileNames_" + System.currentTimeMillis() + ".sh";
LOG.debug("Writing file renaming commands to " + fileName);
PrintWriter pw = createScript(commands, fileName, scriptLocation);
* @return any compaction commands to run for {@code Table t}
private static List<String> getCompactionCommands(Table t, HiveConf conf,
HiveMetaStoreClient hms, CompactionMetaInfo compactionMetaInfo, boolean execute, Hive db,
ValidTxnList txns) throws IOException, TException, HiveException {
if(!isFullAcidTable(t)) {
return Collections.emptyList();
if(t.getPartitionKeysSize() <= 0) {
//not partitioned
if(!needsCompaction(new Path(t.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
return Collections.emptyList();
List<String> cmds = new ArrayList<>();
cmds.add(getCompactionCommand(t, null));
if(execute) {
scheduleCompaction(t, null, db, compactionMetaInfo);
return cmds;
List<String> partNames = hms.listPartitionNames(t.getDbName(), t.getTableName(), (short)-1);
int numWholeBatches = partNames.size()/batchSize;
List<String> compactionCommands = new ArrayList<>();
for(int i = 0; i < numWholeBatches; i++) {
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(i * batchSize, (i + 1) * batchSize));
getCompactionCommands(t, partitionList, db, execute, compactionCommands,
compactionMetaInfo, conf, txns);
if(numWholeBatches * batchSize < partNames.size()) {
//last partial batch
List<Partition> partitionList = hms.getPartitionsByNames(t.getDbName(), t.getTableName(),
partNames.subList(numWholeBatches * batchSize, partNames.size()));
getCompactionCommands(t, partitionList, db, execute, compactionCommands,
compactionMetaInfo, conf, txns);
return compactionCommands;
private static void getCompactionCommands(Table t, List<Partition> partitionList, Hive db,
boolean execute, List<String> compactionCommands, CompactionMetaInfo compactionMetaInfo,
HiveConf conf, ValidTxnList txns)
throws IOException, TException, HiveException {
for (Partition p : partitionList) {
if (needsCompaction(new Path(p.getSd().getLocation()), conf, compactionMetaInfo, txns)) {
compactionCommands.add(getCompactionCommand(t, p));
if (execute) {
scheduleCompaction(t, p, db, compactionMetaInfo);
private static void scheduleCompaction(Table t, Partition p, Hive db,
CompactionMetaInfo compactionMetaInfo) throws HiveException, MetaException {
String partName = p == null ? null :
Warehouse.makePartName(t.getPartitionKeys(), p.getValues());
CompactionResponse resp =
//this gives an easy way to get at compaction ID so we can only wait for those this
//utility started
db.compact2(t.getDbName(), t.getTableName(), partName, "major", null);
if(!resp.isAccepted()) { + (p == null ? "" : "/" + partName) +
" is already being compacted with id=" + resp.getId());
else {"Scheduled compaction for " + Warehouse.getQualifiedName(t) +
(p == null ? "" : "/" + partName) + " with id=" + resp.getId());
* @param location - path to a partition (or table if not partitioned) dir
private static boolean needsCompaction2(Path location, HiveConf conf,
CompactionMetaInfo compactionMetaInfo) throws IOException {
FileSystem fs = location.getFileSystem(conf);
FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
public boolean accept(Path path) {
//checking for delete_delta is only so that this functionality can be exercised by code 3.0
//which cannot produce any deltas with mix of update/insert events
return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
if(deltas == null || deltas.length == 0) {
//base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
//only if there are update/delete events.
return false;
deltaLoop: for(FileStatus delta : deltas) {
if(!delta.isDirectory()) {
//should never happen - just in case
FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
public boolean accept(Path path) {
//since this is inside a delta dir created by Hive 2.x or earlier it can only contain
//bucket_x or bucket_x__flush_length
return path.getName().startsWith("bucket_");
for(FileStatus bucket : buckets) {
if(bucket.getPath().getName().endsWith("_flush_length")) {
//streaming ingest dir - cannot have update/delete events
continue deltaLoop;
if(needsCompaction(bucket, fs)) {
//found delete events - this 'location' needs compacting
compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
//todo: this is not remotely accurate if you have many (relevant) original files
return true;
return false;
* @param location - path to a partition (or table if not partitioned) dir
private static boolean needsCompaction(Path location, HiveConf conf,
CompactionMetaInfo compactionMetaInfo, ValidTxnList txns) throws IOException {
FileSystem fs = location.getFileSystem(conf);
FileStatus[] deltas = fs.listStatus(location, new PathFilter() {
public boolean accept(Path path) {
//checking for delete_delta is only so that this functionality can be exercised by code 3.0
//which cannot produce any deltas with mix of update/insert events
return path.getName().startsWith("delta_") || path.getName().startsWith("delete_delta_");
if(deltas == null || deltas.length == 0) {
//base_n cannot contain update/delete. Original files are all 'insert' and we need to compact
//only if there are update/delete events.
return false;
/*getAcidState() is smart not to return any deltas in current if there is a base that covers
* them, i.e. if they were compacted but not yet cleaned. This means re-checking if
* compaction is needed should cheap(er)*/
AcidUtils.Directory dir = AcidUtils.getAcidState(location, conf, txns);
deltaLoop: for(AcidUtils.ParsedDelta delta : dir.getCurrentDirectories()) {
FileStatus[] buckets = fs.listStatus(delta.getPath(), new PathFilter() {
public boolean accept(Path path) {
//since this is inside a delta dir created by Hive 2.x or earlier it can only contain
//bucket_x or bucket_x__flush_length
return path.getName().startsWith("bucket_");
for(FileStatus bucket : buckets) {
if(bucket.getPath().getName().endsWith("_flush_length")) {
//streaming ingest dir - cannot have update/delete events
continue deltaLoop;
if(needsCompaction(bucket, fs)) {
//found delete events - this 'location' needs compacting
compactionMetaInfo.numberOfBytes += getDataSize(location, conf);
//if there are un-compacted original files, they will be included in compaction, so
//count at the size for 'cost' estimation later
for(HadoopShims.HdfsFileStatusWithId origFile : dir.getOriginalFiles()) {
FileStatus fileStatus = origFile.getFileStatus();
if(fileStatus != null) {
compactionMetaInfo.numberOfBytes += fileStatus.getLen();
return true;
return false;
* @param location - path to a partition (or table if not partitioned) dir
private static long getDataSize(Path location, HiveConf conf) throws IOException {
FileSystem fs = location.getFileSystem(conf);
ContentSummary cs = fs.getContentSummary(location);
return cs.getLength();
private static boolean needsCompaction(FileStatus bucket, FileSystem fs) throws IOException {
//create reader, look at footer
//no need to check side file since it can only be in a streaming ingest delta
Reader orcReader = OrcFile.createReader(bucket.getPath(),OrcFile.readerOptions(fs.getConf())
AcidStats as = OrcAcidUtils.parseAcidStats(orcReader);
if(as == null) {
//should never happen since we are reading bucket_x written by acid write
throw new IllegalStateException("AcidStats missing in " + bucket.getPath());
return as.deletes > 0 || as.updates > 0;
private static String getCompactionCommand(Table t, Partition p) {
StringBuilder sb = new StringBuilder("ALTER TABLE ").append(Warehouse.getQualifiedName(t));
if(t.getPartitionKeysSize() > 0) {
assert p != null : "must supply partition for partitioned table " +
sb.append(" PARTITION(");
for (int i = 0; i < t.getPartitionKeysSize(); i++) {
genPartValueString(t.getPartitionKeys().get(i).getType(), p.getValues().get(i))).
sb.setCharAt(sb.length() - 1, ')');//replace trailing ','
return sb.append(" COMPACT 'major'").toString();
* This is copy-pasted from {@link org.apache.hadoop.hive.ql.parse.ColumnStatsSemanticAnalyzer},
* which can't be refactored since this is linked against Hive 2.x
private static String genPartValueString(String partColType, String partVal) {
String returnVal = partVal;
if (partColType.equals(serdeConstants.STRING_TYPE_NAME) ||
partColType.contains(serdeConstants.VARCHAR_TYPE_NAME) ||
partColType.contains(serdeConstants.CHAR_TYPE_NAME)) {
returnVal = "'" + escapeSQLString(partVal) + "'";
} else if (partColType.equals(serdeConstants.TINYINT_TYPE_NAME)) {
returnVal = partVal + "Y";
} else if (partColType.equals(serdeConstants.SMALLINT_TYPE_NAME)) {
returnVal = partVal + "S";
} else if (partColType.equals(serdeConstants.INT_TYPE_NAME)) {
returnVal = partVal;
} else if (partColType.equals(serdeConstants.BIGINT_TYPE_NAME)) {
returnVal = partVal + "L";
} else if (partColType.contains(serdeConstants.DECIMAL_TYPE_NAME)) {
returnVal = partVal + "BD";
} else if (partColType.equals(serdeConstants.DATE_TYPE_NAME) ||
partColType.equals(serdeConstants.TIMESTAMP_TYPE_NAME)) {
returnVal = partColType + " '" + escapeSQLString(partVal) + "'";
} else {
//for other usually not used types, just quote the value
returnVal = "'" + escapeSQLString(partVal) + "'";
return returnVal;
private static boolean isFullAcidTable(Table t) {
if (t.getParametersSize() <= 0) {
//cannot be acid
return false;
String transacationalValue = t.getParameters()
if (transacationalValue != null && "true".equalsIgnoreCase(transacationalValue)) {
System.out.println("Found Acid table: " + Warehouse.getQualifiedName(t));
return true;
return false;
private static boolean isAcidEnabled(HiveConf hiveConf) {
String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
boolean concurrency = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);
String dbTxnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
return txnMgr.equals(dbTxnMgr) && concurrency;
private static class CompactionMetaInfo {
* total number of bytes to be compacted across all compaction commands
long numberOfBytes;
* IDs of compactions launched by this utility
Set<Long> compactionIds = new HashSet<>();
static abstract class Callback {
* This is a hack enable Unit testing. Derby can't handle multiple concurrent threads but
* somehow Compactor needs to run to test "execute" mode. This callback can be used
* to run Worker. For TESTING ONLY.
void onWaitForCompaction() throws MetaException {}
static Callback callback;
static int pollIntervalMs = 1000*30;
* Also to enable testing until I set up Maven profiles to be able to run with 3.0 jars
static boolean isTestMode = false;
* can set it from tests to test when config needs something other than default values
static HiveConf hiveConf = null;