HIVE-29270: Remove HMS compactor workers deprecated functionality (#6132)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index 588fae4..e8ec181 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -117,10 +117,8 @@ private void addReplChangeManagerConfigs() throws Exception {
private void addCompactorConfigs() {
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true);
- MetastoreConf.setVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN, "metastore");
MetastoreConf.setLongVar(conf, ConfVars.COMPACTOR_WORKER_THREADS, 1);
threadClasses.put(Initiator.class, false);
- threadClasses.put(Worker.class, false);
threadClasses.put(Cleaner.class, false);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 252defb..5dc40b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -64,7 +64,6 @@ public void run() {
try (ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) {
- recoverFailedCompactions(false);
TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
// Make sure we run through the loop once before checking to stop as this makes testing
@@ -81,6 +80,9 @@ public void run() {
startedAt = System.currentTimeMillis();
prevStart = handle.getLastUpdateTime();
+ // Check for timed out workers.
+ recoverFailedCompactions();
+
if (metricsEnabled) {
perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE);
stopCycleUpdater();
@@ -159,8 +161,6 @@ public void run() {
//Use get instead of join, so we can receive InterruptedException and shutdown gracefully
CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).get();
- // Check for timed out remote workers.
- recoverFailedCompactions(true);
handle.releaseLocks(startedAt);
} catch (InterruptedException e) {
// do not ignore interruption requests
@@ -235,8 +235,7 @@ private TableOptimizer instantiateTableOptimizer(String className) {
}
}
- private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
- if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
+ private void recoverFailedCompactions() throws MetaException {
txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS));
}
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 1104082..ab39b27 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -687,38 +687,6 @@ private void countCompactionsInHistory(String dbName, String tableName, String p
}
@Test
- public void testRevokeFromLocalWorkers() throws Exception {
- CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
- txnHandler.compact(rqst);
- rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
- txnHandler.compact(rqst);
- assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
- assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION)));
- assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION)));
- txnHandler.revokeFromLocalWorkers("fred");
-
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- assertEquals(3, compacts.size());
- boolean sawWorkingBob = false;
- int initiatedCount = 0;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) {
- assertEquals("bob-193892", c.getWorkerid());
- sawWorkingBob = true;
- } else if (c.getState().equals("initiated")) {
- initiatedCount++;
- } else {
- fail("Unexpected state");
- }
- }
- assertTrue(sawWorkingBob);
- assertEquals(2, initiatedCount);
- }
-
- @Test
public void testRevokeTimedOutWorkers() throws Exception {
CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
txnHandler.compact(rqst);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 636550a..1b2d6e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -73,7 +73,7 @@ public void nothing() throws Exception {
}
@Test
- public void recoverFailedLocalWorkers() throws Exception {
+ public void recoverFailedWorkers() throws Exception {
Table t = newTable("default", "rflw1", false);
CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
txnHandler.compact(rqst);
@@ -85,40 +85,16 @@ public void recoverFailedLocalWorkers() throws Exception {
txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION));
txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));
- startInitiator();
-
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(2, compacts.size());
- boolean sawInitiated = false;
- for (ShowCompactResponseElement c : compacts) {
- if (c.getState().equals("working")) {
- Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
- } else if (c.getState().equals("initiated")) {
- sawInitiated = true;
- } else {
- Assert.fail("Unexpected state");
- }
- }
- Assert.assertTrue(sawInitiated);
- }
-
- @Test
- public void recoverFailedRemoteWorkers() throws Exception {
- Table t = newTable("default", "rfrw1", false);
- CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
- txnHandler.compact(rqst);
-
- txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));
+ rsp.getCompacts().forEach(ce ->
+ Assert.assertEquals("working", ce.getState()));
conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
-
startInitiator();
- ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("initiated", compacts.get(0).getState());
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ rsp.getCompacts().forEach(ce ->
+ Assert.assertEquals("initiated", ce.getState()));
}
@Test
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 048033c..c145283 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -558,11 +558,9 @@ private void initHAHealthChecker(HttpServer webServer, HiveConf hiveConf) throws
private void logCompactionParameters(HiveConf hiveConf) {
LOG.info("Compaction HS2 parameters:");
- String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN);
- LOG.info("hive.metastore.runworker.in = {}", runWorkerIn);
int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
LOG.info("metastore.compactor.worker.threads = {}", numWorkers);
- if ("hs2".equals(runWorkerIn) && numWorkers < 1) {
+ if (numWorkers < 1) {
LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers);
}
}
@@ -1262,59 +1260,57 @@ private static void startHiveServer2() throws Throwable {
public Map<String, Integer> maybeStartCompactorThreads(HiveConf hiveConf) {
Map<String, Integer> startedWorkers = new HashMap<>();
- if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) {
- Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
- Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);
+ Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
+ Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);
- StringBuilder sb = new StringBuilder(2048);
- sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
- sb.append("Global pool size: ").append(numWorkers.value).append("\n");
+ StringBuilder sb = new StringBuilder(2048);
+ sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
+ sb.append("Global pool size: ").append(numWorkers.value).append("\n");
- LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
- customPools.forEach((poolName, poolWorkers) -> {
- if (poolWorkers == 0) {
- LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
- }
- else if (numWorkers.value == 0) {
- LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
- }
- else {
- if (poolWorkers > numWorkers.value) {
- LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
- "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
- poolWorkers = numWorkers.value;
- }
-
- LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
- IntStream.range(0, poolWorkers).forEach(i -> {
- Worker w = new Worker();
- w.setPoolName(poolName);
- CompactorThread.initializeAndStartThread(w, hiveConf);
- startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
- sb.append(
- String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
- );
- });
- numWorkers.value -= poolWorkers;
- }
- });
-
- if (numWorkers.value == 0) {
- LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
- if (customPools.size() > 0) {
- sb.append("Pool not initialized, no remaining free workers: default\n");
- }
- } else {
- LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
- IntStream.range(0, numWorkers.value).forEach(i -> {
- Worker w = new Worker();
- CompactorThread.initializeAndStartThread(w, hiveConf);
- startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
- sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
- });
+ LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
+ customPools.forEach((poolName, poolWorkers) -> {
+ if (poolWorkers == 0) {
+ LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
}
- LOG.info(sb.toString());
+ else if (numWorkers.value == 0) {
+ LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
+ }
+ else {
+ if (poolWorkers > numWorkers.value) {
+ LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
+ "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
+ poolWorkers = numWorkers.value;
+ }
+
+ LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
+ IntStream.range(0, poolWorkers).forEach(i -> {
+ Worker w = new Worker();
+ w.setPoolName(poolName);
+ CompactorThread.initializeAndStartThread(w, hiveConf);
+ startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
+ sb.append(
+ String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
+ );
+ });
+ numWorkers.value -= poolWorkers;
+ }
+ });
+
+ if (numWorkers.value == 0) {
+ LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
+ if (customPools.size() > 0) {
+ sb.append("Pool not initialized, no remaining free workers: default\n");
+ }
+ } else {
+ LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
+ IntStream.range(0, numWorkers.value).forEach(i -> {
+ Worker w = new Worker();
+ CompactorThread.initializeAndStartThread(w, hiveConf);
+ startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
+ sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
+ });
}
+ LOG.info(sb.toString());
return startedWorkers;
}
diff --git a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
index 7393dd6..42dbdb8 100644
--- a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
+++ b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
@@ -32,7 +32,6 @@ public void testMaybeStartCompactorThreadsOneCustomPool() {
HiveServer2 hs2 = new HiveServer2();
HiveConf conf = new HiveConf();
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 1);
conf.setInt("hive.compactor.worker.pool1.threads", 1);
@@ -46,7 +45,6 @@ public void testMaybeStartCompactorThreadsZeroTotalWorkers() {
HiveServer2 hs2 = new HiveServer2();
HiveConf conf = new HiveConf();
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 0);
conf.setInt("hive.compactor.worker.pool1.threads", 5);
@@ -59,7 +57,6 @@ public void testMaybeStartCompactorThreadsZeroCustomWorkers() {
HiveServer2 hs2 = new HiveServer2();
HiveConf conf = new HiveConf();
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 5);
Map<String, Integer> startedWorkers = hs2.maybeStartCompactorThreads(conf);
@@ -72,7 +69,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPools() {
HiveServer2 hs2 = new HiveServer2();
HiveConf conf = new HiveConf();
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 12);
conf.setInt("hive.compactor.worker.pool1.threads", 3);
conf.setInt("hive.compactor.worker.pool2.threads", 4);
@@ -90,7 +86,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPoolsAndDefaultPool() {
HiveServer2 hs2 = new HiveServer2();
HiveConf conf = new HiveConf();
- MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 15);
conf.setInt("hive.compactor.worker.pool1.threads", 3);
conf.setInt("hive.compactor.worker.pool2.threads", 4);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index cff6209..63a0d10 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1774,16 +1774,6 @@ public enum ConfVars {
"Batch size for partition and other object retrieval from the underlying DB in JDO.\n" +
"The JDO implementation such as DataNucleus may run into issues when the generated queries are\n" +
"too large. Use this parameter to break the query into multiple batches. -1 means no batching."),
- /**
- * @deprecated Deprecated due to HIVE-26443
- */
- @Deprecated
- HIVE_METASTORE_RUNWORKER_IN("hive.metastore.runworker.in",
- "hive.metastore.runworker.in", "hs2", new StringSetValidator("metastore", "hs2"),
- "Deprecated. HMS side compaction workers doesn't support pooling. With the concept of compaction " +
- "pools (HIVE-26443), running workers on HMS side is still supported but not suggested anymore. " +
- "This config value will be removed in the future.\n" +
- "Chooses where the compactor worker threads should run, Only possible values are \"metastore\" and \"hs2\""),
// Hive values we have copied and use as is
// These two are used to indicate that we are running tests
HIVE_IN_TEST("hive.in.test", "hive.in.test", false, "internal usage only, true in test mode"),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index e036837..1d9fbf3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -837,12 +837,9 @@ public void run() {
.setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING) // housekeeping tasks
.addListener(new CMClearer(conf))
.addListener(new StatsUpdaterTask(conf))
- .addListener(new CompactorTasks(conf, false))
+ .addListener(new CompactorTasks(conf))
.addListener(new CompactorPMF())
.addListener(new HouseKeepingTasks(conf, true))
- .setTType(LeaderElectionContext.TTYPE.WORKER) // compactor worker
- .addListener(new CompactorTasks(conf, true),
- MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore"))
.build();
if (shutdownHookMgr != null) {
shutdownHookMgr.addShutdownHook(() -> context.close(), 0);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
index fcc5140..956a5e5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
@@ -36,16 +36,14 @@
public class CompactorTasks implements LeaderElection.LeadershipStateListener {
private final Configuration configuration;
- private final boolean runOnlyWorker;
// each MetaStoreThread runs as a thread
private Map<MetaStoreThread, AtomicBoolean> metastoreThreadsMap;
- public CompactorTasks(Configuration configuration, boolean runOnlyWorker) {
+ public CompactorTasks(Configuration configuration) {
// recreate a new configuration
this.configuration = new Configuration(requireNonNull(configuration,
"configuration is null"));
- this.runOnlyWorker = runOnlyWorker;
}
// Copied from HiveMetaStore
@@ -62,66 +60,41 @@ private MetaStoreThread instantiateThread(String classname) throws Exception {
public List<MetaStoreThread> getCompactorThreads() throws Exception {
List<MetaStoreThread> compactors = new ArrayList<>();
- if (!runOnlyWorker) {
- if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
- MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
- compactors.add(initiator);
- }
- if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
- MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
- compactors.add(cleaner);
- }
- } else {
- boolean runInMetastore = MetastoreConf.getVar(configuration,
- MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore");
- if (runInMetastore) {
- HiveMetaStore.LOG.warn("Running compaction workers on HMS side is not suggested because compaction pools are not supported in HMS " +
- "(HIVE-26443). Consider removing the hive.metastore.runworker.in configuration setting, as it will be " +
- "comletely removed in future releases.");
- int numWorkers = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
- for (int i = 0; i < numWorkers; i++) {
- MetaStoreThread worker = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
- compactors.add(worker);
- }
- }
+ if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
+ MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+ compactors.add(initiator);
+ }
+ if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
+ MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+ compactors.add(cleaner);
}
return compactors;
}
private void logCompactionParameters() {
- if (!runOnlyWorker) {
- HiveMetaStore.LOG.info("Compaction HMS parameters:");
- HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}",
- MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON));
- HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}",
- MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON));
- HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}",
- MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
- HiveMetaStore.LOG.info("hive.metastore.runworker.in = {}",
- MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN));
- HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}",
- MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE));
- HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}",
- MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED));
- HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}",
- MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
- HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}",
- MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD));
- HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression",
- MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION));
+ HiveMetaStore.LOG.info("Compaction HMS parameters:");
+ HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}",
+ MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON));
+ HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}",
+ MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON));
+ HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}",
+ MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
+ HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}",
+ MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE));
+ HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}",
+ MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED));
+ HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}",
+ MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+ HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}",
+ MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD));
+ HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression",
+ MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION));
- if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
- HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered.");
- }
- if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
- HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered.");
- }
-
- } else {
- int numThreads = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
- if (numThreads < 1) {
- HiveMetaStore.LOG.warn("Invalid number of Compactor Worker threads({}) on HMS", numThreads);
- }
+ if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
+ HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered.");
+ }
+ if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
+ HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered.");
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 91cfe31..393cb94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -259,33 +259,10 @@ public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
}
/**
- * This will take all entries assigned to workers
- * on a host return them to INITIATED state. The initiator should use this at start up to
- * clean entries from any workers that were in the middle of compacting when the metastore
- * shutdown. It does not reset entries from worker threads on other hosts as those may still
- * be working.
- * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
- * so that like hostname% will match the worker id.
- */
- @Override
- @RetrySemantics.Idempotent
- public void revokeFromLocalWorkers(String hostname) throws MetaException {
- jdbcResource.execute(
- "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL," +
- " \"CQ_STATE\" = :initiatedState WHERE \"CQ_STATE\" = :workingState AND \"CQ_WORKER_ID\" LIKE :hostname",
- new MapSqlParameterSource()
- .addValue("initiatedState", Character.toString(INITIATED_STATE), Types.CHAR)
- .addValue("workingState", Character.toString(WORKING_STATE), Types.CHAR)
- .addValue("hostname", hostname + "%"),
- null);
- }
-
- /**
* This call will return all compaction queue
* entries assigned to a worker but over the timeout back to the initiated state.
* This should be called by the initiator on start up and occasionally when running to clean up
- * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
- * first.
+ * after dead threads.
* @param timeout number of milliseconds since start time that should elapse before a worker is
* declared dead.
*/
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 42a80b1..13f32f6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -742,25 +742,10 @@ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedT
void cleanEmptyAbortedAndCommittedTxns() throws MetaException;
/**
- * This will take all entries assigned to workers
- * on a host return them to INITIATED state. The initiator should use this at start up to
- * clean entries from any workers that were in the middle of compacting when the metastore
- * shutdown. It does not reset entries from worker threads on other hosts as those may still
- * be working.
- * @param hostname Name of this host. It is assumed this prefixes the thread's worker id,
- * so that like hostname% will match the worker id.
- */
- @SqlRetry
- @Transactional(POOL_COMPACTOR)
- @RetrySemantics.Idempotent
- void revokeFromLocalWorkers(String hostname) throws MetaException;
-
- /**
* This call will return all compaction queue
* entries assigned to a worker but over the timeout back to the initiated state.
* This should be called by the initiator on start up and occasionally when running to clean up
- * after dead threads. At start up {@link #revokeFromLocalWorkers(String)} should be called
- * first.
+ * after dead threads.
* @param timeout number of milliseconds since start time that should elapse before a worker is
* declared dead.
*/