blob: 66335c90c4f77b177519b545d3cdf6f3fa7574eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.apache.thrift.TException;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Stack;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities.CompactorThreadType;
/**
* Super class for all of the compactor test modules.
*/
public abstract class CompactorTest {
static final private String CLASS_NAME = CompactorTest.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
public static final String WORKER_VERSION = "4.0.0";
protected TxnStore txnHandler;
protected IMetaStoreClient ms;
protected HiveConf conf;
private final AtomicBoolean stop = new AtomicBoolean();
protected File tmpdir;
@Before
public void setup() throws Exception {
conf = new HiveConf();
MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.cleanDb(conf);
TestTxnDbUtil.prepDb(conf);
ms = new HiveMetaStoreClient(conf);
txnHandler = TxnUtils.getTxnStore(conf);
tmpdir = new File(Files.createTempDirectory("compactor_test_table_").toString());
}
protected void compactorTestCleanup() throws IOException {
FileUtils.deleteDirectory(tmpdir);
}
protected void startInitiator() throws Exception {
startThread(CompactorThreadType.INITIATOR, true);
}
protected void startWorker() throws Exception {
startThread(CompactorThreadType.WORKER, true);
}
protected void startCleaner() throws Exception {
startThread(CompactorThreadType.CLEANER, true);
}
protected void runAcidMetricService() throws Exception {
TestTxnDbUtil.setConfValues(conf);
AcidMetricService t = new AcidMetricService();
t.setConf(conf);
t.run();
}
protected Table newTable(String dbName, String tableName, boolean partitioned) throws TException {
return newTable(dbName, tableName, partitioned, new HashMap<String, String>(), null, false);
}
protected Table newTable(String dbName, String tableName, boolean partitioned,
Map<String, String> parameters) throws TException {
return newTable(dbName, tableName, partitioned, parameters, null, false);
}
protected Table newTable(String dbName, String tableName, boolean partitioned,
Map<String, String> parameters, List<Order> sortCols,
boolean isTemporary)
throws TException {
Table table = new Table();
table.setTableType(TableType.MANAGED_TABLE.name());
table.setTableName(tableName);
table.setDbName(dbName);
table.setOwner("me");
table.setSd(newStorageDescriptor(getLocation(tableName, null), sortCols));
List<FieldSchema> partKeys = new ArrayList<FieldSchema>(1);
if (partitioned) {
partKeys.add(new FieldSchema("ds", "string", "no comment"));
table.setPartitionKeys(partKeys);
}
// Set the table as transactional for compaction to work
if (parameters == null) {
parameters = new HashMap<>();
}
parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
if (sortCols != null) {
// Sort columns are not allowed for full ACID table. So, change it to insert-only table
parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
}
table.setParameters(parameters);
if (isTemporary) table.setTemporary(true);
// drop the table first, in case some previous test created it
ms.dropTable(dbName, tableName);
ms.createTable(table);
return table;
}
protected Partition newPartition(Table t, String value) throws Exception {
return newPartition(t, value, null);
}
protected Partition newPartition(Table t, String value, List<Order> sortCols) throws Exception {
Partition part = new Partition();
part.addToValues(value);
part.setDbName(t.getDbName());
part.setTableName(t.getTableName());
part.setSd(newStorageDescriptor(getLocation(t.getTableName(), value), sortCols));
part.setParameters(new HashMap<String, String>());
ms.add_partition(part);
return part;
}
protected long openTxn() throws MetaException {
return openTxn(TxnType.DEFAULT);
}
protected long openTxn(TxnType txnType) throws MetaException {
OpenTxnRequest rqst = new OpenTxnRequest(1, System.getProperty("user.name"), ServerUtils.hostname());
rqst.setTxn_type(txnType);
if (txnType == TxnType.REPL_CREATED) {
rqst.setReplPolicy("default.*");
rqst.setReplSrcTxnIds(Arrays.asList(1L));
}
List<Long> txns = txnHandler.openTxns(rqst).getTxn_ids();
return txns.get(0);
}
protected long allocateWriteId(String dbName, String tblName, long txnid)
throws MetaException, TxnAbortedException, NoSuchTxnException {
AllocateTableWriteIdsRequest awiRqst
= new AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(Collections.singletonList(txnid));
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
return awiResp.getTxnToWriteIds().get(0).getWriteId();
}
protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
}
protected void addLengthFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.LENGTH_FILE, 2, true);
}
protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords) throws Exception {
addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true);
}
protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, long visibilityId) throws Exception {
addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, 2, true, visibilityId);
}
protected void addLegacyFile(Table t, Partition p, int numRecords) throws Exception {
addFile(t, p, 0, 0, numRecords, FileType.LEGACY, 2, true);
}
protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords,
int numBuckets, boolean allBucketsPresent) throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets, allBucketsPresent);
}
protected void addBaseFile(Table t, Partition p, long maxTxn, int numRecords, int numBuckets,
boolean allBucketsPresent) throws Exception {
addFile(t, p, 0, maxTxn, numRecords, FileType.BASE, numBuckets, allBucketsPresent);
}
protected List<Path> getDirectories(HiveConf conf, Table t, Partition p) throws Exception {
String partValue = (p == null) ? null : p.getValues().get(0);
String location = getLocation(t.getTableName(), partValue);
Path dir = new Path(location);
FileSystem fs = FileSystem.get(conf);
FileStatus[] stats = fs.listStatus(dir);
List<Path> paths = new ArrayList<Path>(stats.length);
for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
return paths;
}
protected void burnThroughTransactions(String dbName, String tblName, int num)
throws MetaException, NoSuchTxnException, TxnAbortedException {
burnThroughTransactions(dbName, tblName, num, null, null);
}
protected void burnThroughTransactions(String dbName, String tblName, int num, Set<Long> open, Set<Long> aborted)
throws NoSuchTxnException, TxnAbortedException, MetaException {
burnThroughTransactions(dbName, tblName, num, open, aborted, null);
}
protected void burnThroughTransactions(String dbName, String tblName, int num, Set<Long> open, Set<Long> aborted, LockRequest lockReq)
throws MetaException, NoSuchTxnException, TxnAbortedException {
OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(rsp.getTxn_ids());
AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
int i = 0;
for (long tid : rsp.getTxn_ids()) {
assert(awiResp.getTxnToWriteIds().get(i++).getTxnId() == tid);
if(lockReq != null) {
lockReq.setTxnid(tid);
txnHandler.lock(lockReq);
}
if (aborted != null && aborted.contains(tid)) {
txnHandler.abortTxn(new AbortTxnRequest(tid));
} else if (open == null || (open != null && !open.contains(tid))) {
txnHandler.commitTxn(new CommitTxnRequest(tid));
}
}
}
protected void stopThread() {
stop.set(true);
}
private StorageDescriptor newStorageDescriptor(String location, List<Order> sortCols) {
StorageDescriptor sd = new StorageDescriptor();
List<FieldSchema> cols = new ArrayList<FieldSchema>(2);
cols.add(new FieldSchema("a", "varchar(25)", "still no comment"));
cols.add(new FieldSchema("b", "int", "comment"));
sd.setCols(cols);
sd.setLocation(location);
sd.setInputFormat(MockInputFormat.class.getName());
sd.setOutputFormat(MockOutputFormat.class.getName());
sd.setNumBuckets(1);
SerDeInfo serde = new SerDeInfo();
serde.setSerializationLib(LazySimpleSerDe.class.getName());
sd.setSerdeInfo(serde);
List<String> bucketCols = new ArrayList<String>(1);
bucketCols.add("a");
sd.setBucketCols(bucketCols);
if (sortCols != null) {
sd.setSortCols(sortCols);
}
return sd;
}
// I can't do this with @Before because I want to be able to control when the thread starts
private void startThread(CompactorThreadType type, boolean stopAfterOne) throws Exception {
TestTxnDbUtil.setConfValues(conf);
CompactorThread t;
switch (type) {
case INITIATOR: t = new Initiator(); break;
case WORKER: t = new Worker(); break;
case CLEANER: t = new Cleaner(); break;
default: throw new RuntimeException("Huh? Unknown thread type.");
}
t.setThreadId((int) t.getId());
t.setConf(conf);
stop.set(stopAfterOne);
t.init(stop);
if (stopAfterOne) t.run();
else t.start();
}
private String getLocation(String tableName, String partValue) {
String location = tmpdir.getAbsolutePath() +
System.getProperty("file.separator") + tableName;
if (partValue != null) {
location += System.getProperty("file.separator") + "ds=" + partValue;
}
return location;
}
private enum FileType {BASE, DELTA, LEGACY, LENGTH_FILE}
private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets,
boolean allBucketsPresent) throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, type, numBuckets, allBucketsPresent, 0);
}
private void addFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords, FileType type, int numBuckets,
boolean allBucketsPresent, long visibilityId) throws Exception {
String partValue = (p == null) ? null : p.getValues().get(0);
Path location = new Path(getLocation(t.getTableName(), partValue));
String filename = null;
switch (type) {
case BASE: filename = AcidUtils.BASE_PREFIX + maxTxn + (visibilityId > 0 ? AcidUtils.VISIBILITY_PREFIX + visibilityId : ""); break;
case LENGTH_FILE: // Fall through to delta
case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
case LEGACY: break; // handled below
}
FileSystem fs = FileSystem.get(conf);
for (int bucket = 0; bucket < numBuckets; bucket++) {
if (bucket == 0 && !allBucketsPresent) continue; // skip one
Path partFile = null;
if (type == FileType.LEGACY) {
partFile = new Path(location, String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0");
} else {
Path dir = new Path(location, filename);
fs.mkdirs(dir);
partFile = AcidUtils.createBucketFile(dir, bucket);
if (type == FileType.LENGTH_FILE) {
partFile = new Path(partFile.toString() + AcidUtils.DELTA_SIDE_FILE_SUFFIX);
}
}
FSDataOutputStream out = fs.create(partFile);
if (type == FileType.LENGTH_FILE) {
out.writeInt(numRecords);//hmm - length files should store length in bytes...
} else {
for (int i = 0; i < numRecords; i++) {
RecordIdentifier ri = new RecordIdentifier(maxTxn - 1, bucket, i);
ri.write(out);
out.writeBytes("mary had a little lamb its fleece was white as snow\n");
}
}
out.close();
}
}
static class MockInputFormat implements AcidInputFormat<WritableComparable,Text> {
@Override
public AcidInputFormat.RowReader<Text> getReader(InputSplit split,
Options options) throws
IOException {
return null;
}
@Override
public RawReader<Text> getRawReader(Configuration conf, boolean collapseEvents, int bucket,
ValidWriteIdList validWriteIdList,
Path baseDirectory, Path[] deltaDirectory, Map<String, Integer> deltaToAttemptId) throws IOException {
List<Path> filesToRead = new ArrayList<Path>();
if (baseDirectory != null) {
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
FileSystem fs = p.getFileSystem(conf);
if (fs.exists(p)) filesToRead.add(p);
} else {
filesToRead.add(new Path(baseDirectory, "000000_0"));
}
}
for (int i = 0; i < deltaDirectory.length; i++) {
Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
FileSystem fs = p.getFileSystem(conf);
if (fs.exists(p)) filesToRead.add(p);
}
return new MockRawReader(conf, filesToRead);
}
@Override
public InputSplit[] getSplits(JobConf entries, int i) throws IOException {
return new InputSplit[0];
}
@Override
public RecordReader<WritableComparable, Text> getRecordReader(InputSplit inputSplit, JobConf entries,
Reporter reporter) throws IOException {
return null;
}
@Override
public boolean validateInput(FileSystem fs, HiveConf conf, List<FileStatus> files) throws
IOException {
return false;
}
}
static class MockRawReader implements AcidInputFormat.RawReader<Text> {
private final Stack<Path> filesToRead;
private final Configuration conf;
private FSDataInputStream is = null;
private final FileSystem fs;
private boolean lastWasDelete = true;
MockRawReader(Configuration conf, List<Path> files) throws IOException {
filesToRead = new Stack<Path>();
for (Path file : files) filesToRead.push(file);
this.conf = conf;
fs = FileSystem.get(conf);
}
@Override
public ObjectInspector getObjectInspector() {
return null;
}
/**
* This is bogus especially with split update acid tables. This causes compaction to create
* delete_delta_x_y where none existed before. Makes the data layout such as would never be
* created by 'real' code path.
*/
@Override
public boolean isDelete(Text value) {
// Alternate between returning deleted and not. This is easier than actually
// tracking operations. We test that this is getting properly called by checking that only
// half the records show up in base files after major compactions.
lastWasDelete = !lastWasDelete;
return lastWasDelete;
}
@Override
public boolean next(RecordIdentifier identifier, Text text) throws IOException {
if (is == null) {
// Open the next file
if (filesToRead.empty()) return false;
Path p = filesToRead.pop();
LOG.debug("Reading records from " + p.toString());
is = fs.open(p);
}
String line = null;
try {
identifier.readFields(is);
line = is.readLine();
} catch (EOFException e) {
}
if (line == null) {
// Set our current entry to null (since it's done) and try again.
is = null;
return next(identifier, text);
}
text.set(line);
return true;
}
@Override
public RecordIdentifier createKey() {
return new RecordIdentifier();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return 0;
}
@Override
public void close() throws IOException {
}
@Override
public float getProgress() throws IOException {
return 0;
}
}
// This class isn't used and I suspect does totally the wrong thing. It's only here so that I
// can provide some output format to the tables and partitions I create. I actually write to
// those tables directory.
static class MockOutputFormat implements AcidOutputFormat<WritableComparable, Text> {
@Override
public RecordUpdater getRecordUpdater(Path path, Options options) throws
IOException {
return null;
}
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getRawRecordWriter(Path path, Options options) throws IOException {
return new MockRecordWriter(path, options);
}
@Override
public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass,
boolean isCompressed, Properties tableProperties,
Progressable progress) throws IOException {
return null;
}
@Override
public RecordWriter<WritableComparable, Text> getRecordWriter(FileSystem fileSystem, JobConf entries,
String s,
Progressable progressable) throws
IOException {
return null;
}
@Override
public void checkOutputSpecs(FileSystem fileSystem, JobConf entries) throws IOException {
}
}
// This class isn't used and I suspect does totally the wrong thing. It's only here so that I
// can provide some output format to the tables and partitions I create. I actually write to
// those tables directory.
static class MockRecordWriter implements org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
private final FSDataOutputStream os;
MockRecordWriter(Path basedir, AcidOutputFormat.Options options) throws IOException {
FileSystem fs = FileSystem.get(options.getConfiguration());
Path p = AcidUtils.createFilename(basedir, options);
os = fs.create(p);
}
@Override
public void write(Writable w) throws IOException {
Text t = (Text)w;
os.writeBytes(t.toString());
os.writeBytes("\n");
}
@Override
public void close(boolean abort) throws IOException {
os.close();
}
}
/**
* in Hive 1.3.0 delta file names changed to delta_xxxx_yyyy_zzzz; prior to that
* the name was delta_xxxx_yyyy. We want to run compaction tests such that both formats
* are used since new (1.3) code has to be able to read old files.
*/
abstract boolean useHive130DeltaDirName();
String makeDeltaDirName(long minTxnId, long maxTxnId) {
if(minTxnId != maxTxnId) {
//covers both streaming api and post compaction style.
return makeDeltaDirNameCompacted(minTxnId, maxTxnId);
}
return useHive130DeltaDirName() ?
AcidUtils.deltaSubdir(minTxnId, maxTxnId, 0) : AcidUtils.deltaSubdir(minTxnId, maxTxnId);
}
/**
* delta dir name after compaction
*/
String makeDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
return AcidUtils.deltaSubdir(minTxnId, maxTxnId);
}
String makeDeleteDeltaDirNameCompacted(long minTxnId, long maxTxnId) {
return AcidUtils.deleteDeltaSubdir(minTxnId, maxTxnId);
}
protected long compactInTxn(CompactionRequest rqst) throws Exception {
txnHandler.compact(rqst);
CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
ci.runAs = System.getProperty("user.name");
long compactorTxnId = openTxn(TxnType.COMPACTION);
// Need to create a valid writeIdList to set the highestWriteId in ci
ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), compactorTxnId);
GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest();
writeIdsRequest.setValidTxnList(validTxnList.writeToString());
writeIdsRequest
.setFullTableNames(Collections.singletonList(TxnUtils.getFullTableName(rqst.getDbname(), rqst.getTablename())));
// with this ValidWriteIdList is capped at whatever HWM validTxnList has
ValidCompactorWriteIdList tblValidWriteIds = TxnUtils
.createValidCompactWriteIdList(txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds().get(0));
ci.highestWriteId = tblValidWriteIds.getHighWatermark();
txnHandler.updateCompactorState(ci, compactorTxnId);
txnHandler.markCompacted(ci);
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
Thread.sleep(MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
return compactorTxnId;
}
}