HIVE-29270: Remove HMS compactor workers deprecated functionality (#6132)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
index 588fae4..e8ec181 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/MetastoreHousekeepingLeaderTestBase.java
@@ -117,10 +117,8 @@ private void addReplChangeManagerConfigs() throws Exception {
   private void addCompactorConfigs() {
     MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true);
-    MetastoreConf.setVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN, "metastore");
     MetastoreConf.setLongVar(conf, ConfVars.COMPACTOR_WORKER_THREADS, 1);
     threadClasses.put(Initiator.class, false);
-    threadClasses.put(Worker.class, false);
     threadClasses.put(Cleaner.class, false);
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 252defb..5dc40b2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -64,7 +64,6 @@ public void run() {
     try (ExecutorService compactionExecutor = CompactorUtil.createExecutorWithThreadFactory(
         conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_REQUEST_QUEUE),
         COMPACTOR_INTIATOR_THREAD_NAME_FORMAT)) {
-      recoverFailedCompactions(false);
       TxnStore.MutexAPI mutex = shouldUseMutex ? txnHandler.getMutexAPI() : new NoMutex();
 
       // Make sure we run through the loop once before checking to stop as this makes testing
@@ -81,6 +80,9 @@ public void run() {
           startedAt = System.currentTimeMillis();
           prevStart = handle.getLastUpdateTime();
 
+          // Check for timed out workers.
+          recoverFailedCompactions();
+
           if (metricsEnabled) {
             perfLogger.perfLogBegin(CLASS_NAME, MetricsConstants.COMPACTION_INITIATOR_CYCLE);
             stopCycleUpdater();
@@ -159,8 +161,6 @@ public void run() {
           //Use get instead of join, so we can receive InterruptedException and shutdown gracefully
           CompletableFuture.allOf(compactionList.toArray(new CompletableFuture[0])).get();
 
-          // Check for timed out remote workers.
-          recoverFailedCompactions(true);
           handle.releaseLocks(startedAt);
         } catch (InterruptedException e) {
           // do not ignore interruption requests
@@ -235,8 +235,7 @@ private TableOptimizer instantiateTableOptimizer(String className) {
     }
   }
 
-  private void recoverFailedCompactions(boolean remoteOnly) throws MetaException {
-    if (!remoteOnly) txnHandler.revokeFromLocalWorkers(ServerUtils.hostname());
+  private void recoverFailedCompactions() throws MetaException {
     txnHandler.revokeTimedoutWorkers(HiveConf.getTimeVar(conf,
         HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS));
   }
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 1104082..ab39b27 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -687,38 +687,6 @@ private void countCompactionsInHistory(String dbName, String tableName, String p
   }
 
   @Test
-  public void testRevokeFromLocalWorkers() throws Exception {
-    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-    assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
-    assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION)));
-    assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION)));
-    txnHandler.revokeFromLocalWorkers("fred");
-
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    assertEquals(3, compacts.size());
-    boolean sawWorkingBob = false;
-    int initiatedCount = 0;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working")) {
-        assertEquals("bob-193892", c.getWorkerid());
-        sawWorkingBob = true;
-      } else if (c.getState().equals("initiated")) {
-        initiatedCount++;
-      } else {
-        fail("Unexpected state");
-      }
-    }
-    assertTrue(sawWorkingBob);
-    assertEquals(2, initiatedCount);
-  }
-
-  @Test
   public void testRevokeTimedOutWorkers() throws Exception {
     CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
     txnHandler.compact(rqst);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 636550a..1b2d6e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -73,7 +73,7 @@ public void nothing() throws Exception {
   }
 
   @Test
-  public void recoverFailedLocalWorkers() throws Exception {
+  public void recoverFailedWorkers() throws Exception {
     Table t = newTable("default", "rflw1", false);
     CompactionRequest rqst = new CompactionRequest("default", "rflw1", CompactionType.MINOR);
     txnHandler.compact(rqst);
@@ -85,40 +85,16 @@ public void recoverFailedLocalWorkers() throws Exception {
     txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", WORKER_VERSION));
     txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));
 
-    startInitiator();
-
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(2, compacts.size());
-    boolean sawInitiated = false;
-    for (ShowCompactResponseElement c : compacts) {
-      if (c.getState().equals("working")) {
-        Assert.assertEquals("nosuchhost-193892", c.getWorkerid());
-      } else if (c.getState().equals("initiated")) {
-        sawInitiated = true;
-      } else {
-        Assert.fail("Unexpected state");
-      }
-    }
-    Assert.assertTrue(sawInitiated);
-  }
-
-  @Test
-  public void recoverFailedRemoteWorkers() throws Exception {
-    Table t = newTable("default", "rfrw1", false);
-    CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
-    txnHandler.compact(rqst);
-
-    txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", WORKER_VERSION));
+    rsp.getCompacts().forEach(ce ->
+            Assert.assertEquals("working", ce.getState()));
 
     conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
-
     startInitiator();
 
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("initiated", compacts.get(0).getState());
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    rsp.getCompacts().forEach(ce ->
+            Assert.assertEquals("initiated", ce.getState()));
   }
 
   @Test
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index 048033c..c145283 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -558,11 +558,9 @@ private void initHAHealthChecker(HttpServer webServer, HiveConf hiveConf) throws
 
   private void logCompactionParameters(HiveConf hiveConf) {
     LOG.info("Compaction HS2 parameters:");
-    String runWorkerIn = MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN);
-    LOG.info("hive.metastore.runworker.in = {}", runWorkerIn);
     int numWorkers = MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
     LOG.info("metastore.compactor.worker.threads = {}", numWorkers);
-    if ("hs2".equals(runWorkerIn) && numWorkers < 1) {
+    if (numWorkers < 1) {
       LOG.warn("Invalid number of Compactor Worker threads({}) on HS2", numWorkers);
     }
   }
@@ -1262,59 +1260,57 @@ private static void startHiveServer2() throws Throwable {
 
   public Map<String, Integer> maybeStartCompactorThreads(HiveConf hiveConf) {
     Map<String, Integer> startedWorkers = new HashMap<>();
-    if (MetastoreConf.getVar(hiveConf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("hs2")) {
-      Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
-      Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);
+    Ref<Integer> numWorkers = new Ref<>(MetastoreConf.getIntVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
+    Map<String, Integer> customPools = CompactorUtil.getPoolConf(hiveConf);
 
-      StringBuilder sb = new StringBuilder(2048);
-      sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
-      sb.append("Global pool size: ").append(numWorkers.value).append("\n");
+    StringBuilder sb = new StringBuilder(2048);
+    sb.append("This HS2 instance will act as Compactor with the following worker pool configuration:\n");
+    sb.append("Global pool size: ").append(numWorkers.value).append("\n");
 
-      LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
-      customPools.forEach((poolName, poolWorkers) -> {
-        if (poolWorkers == 0) {
-          LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
-        }
-        else if (numWorkers.value == 0) {
-          LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
-        }
-        else {
-          if (poolWorkers > numWorkers.value) {
-            LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
-                "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
-            poolWorkers = numWorkers.value;
-          }
-
-          LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
-          IntStream.range(0, poolWorkers).forEach(i -> {
-            Worker w = new Worker();
-            w.setPoolName(poolName);
-            CompactorThread.initializeAndStartThread(w, hiveConf);
-            startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
-            sb.append(
-                String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
-            );
-          });
-          numWorkers.value -= poolWorkers;
-        }
-      });
-
-      if (numWorkers.value == 0) {
-        LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
-        if (customPools.size() > 0) {
-          sb.append("Pool not initialized, no remaining free workers: default\n");
-        }
-      } else {
-        LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
-        IntStream.range(0, numWorkers.value).forEach(i -> {
-          Worker w = new Worker();
-          CompactorThread.initializeAndStartThread(w, hiveConf);
-          startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
-          sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
-        });
+    LOG.info("Initializing the compaction pools with using the global worker limit: {} ", numWorkers.value);
+    customPools.forEach((poolName, poolWorkers) -> {
+      if (poolWorkers == 0) {
+        LOG.warn("Pool not initialized, configured with zero workers: {}", poolName);
       }
-      LOG.info(sb.toString());
+      else if (numWorkers.value == 0) {
+        LOG.warn("Pool not initialized, no available workers remained: {}", poolName);
+      }
+      else {
+        if (poolWorkers > numWorkers.value) {
+          LOG.warn("Global worker pool exhausted, compaction pool ({}) will be configured with less workers than the " +
+              "required number. ({} -> {})", poolName, poolWorkers, numWorkers.value);
+          poolWorkers = numWorkers.value;
+        }
+
+        LOG.info("Initializing compaction pool ({}) with {} workers.", poolName, poolWorkers);
+        IntStream.range(0, poolWorkers).forEach(i -> {
+          Worker w = new Worker();
+          w.setPoolName(poolName);
+          CompactorThread.initializeAndStartThread(w, hiveConf);
+          startedWorkers.compute(poolName, (k, v) -> (v == null) ? 1 : v + 1);
+          sb.append(
+            String.format("Worker - Name: %s, Pool: %s, Priority: %d", w.getName(), poolName, w.getPriority())
+          );
+        });
+        numWorkers.value -= poolWorkers;
+      }
+    });
+
+    if (numWorkers.value == 0) {
+      LOG.warn("No default compaction pool configured, all non-labeled compaction requests will remain unprocessed!");
+      if (customPools.size() > 0) {
+        sb.append("Pool not initialized, no remaining free workers: default\n");
+      }
+    } else {
+      LOG.info("Initializing default compaction pool with {} workers.", numWorkers.value);
+      IntStream.range(0, numWorkers.value).forEach(i -> {
+        Worker w = new Worker();
+        CompactorThread.initializeAndStartThread(w, hiveConf);
+        startedWorkers.compute(Constants.COMPACTION_DEFAULT_POOL, (k, v) -> (v == null) ? 1 : v + 1);
+        sb.append("Worker - Name: ").append(w.getName()).append(", Pool: default, Priority: ").append(w.getPriority()).append("\n");
+      });
     }
+    LOG.info(sb.toString());
     return startedWorkers;
   }
 
diff --git a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
index 7393dd6..42dbdb8 100644
--- a/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
+++ b/service/src/test/org/apache/hive/service/server/TestHiveServer2.java
@@ -32,7 +32,6 @@ public void testMaybeStartCompactorThreadsOneCustomPool() {
     HiveServer2 hs2 = new HiveServer2();
 
     HiveConf conf = new HiveConf();
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
     MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 1);
     conf.setInt("hive.compactor.worker.pool1.threads", 1);
 
@@ -46,7 +45,6 @@ public void testMaybeStartCompactorThreadsZeroTotalWorkers() {
     HiveServer2 hs2 = new HiveServer2();
 
     HiveConf conf = new HiveConf();
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
     MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 0);
     conf.setInt("hive.compactor.worker.pool1.threads", 5);
 
@@ -59,7 +57,6 @@ public void testMaybeStartCompactorThreadsZeroCustomWorkers() {
     HiveServer2 hs2 = new HiveServer2();
 
     HiveConf conf = new HiveConf();
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
     MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 5);
 
     Map<String, Integer> startedWorkers = hs2.maybeStartCompactorThreads(conf);
@@ -72,7 +69,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPools() {
     HiveServer2 hs2 = new HiveServer2();
 
     HiveConf conf = new HiveConf();
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
     MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 12);
     conf.setInt("hive.compactor.worker.pool1.threads", 3);
     conf.setInt("hive.compactor.worker.pool2.threads", 4);
@@ -90,7 +86,6 @@ public void testMaybeStartCompactorThreadsMultipleCustomPoolsAndDefaultPool() {
     HiveServer2 hs2 = new HiveServer2();
 
     HiveConf conf = new HiveConf();
-    MetastoreConf.setVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN, "hs2");
     MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS, 15);
     conf.setInt("hive.compactor.worker.pool1.threads", 3);
     conf.setInt("hive.compactor.worker.pool2.threads", 4);
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index cff6209..63a0d10 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1774,16 +1774,6 @@ public enum ConfVars {
         "Batch size for partition and other object retrieval from the underlying DB in JDO.\n" +
         "The JDO implementation such as DataNucleus may run into issues when the generated queries are\n" +
         "too large. Use this parameter to break the query into multiple batches. -1 means no batching."),
-    /**
-     * @deprecated Deprecated due to HIVE-26443
-     */
-    @Deprecated
-    HIVE_METASTORE_RUNWORKER_IN("hive.metastore.runworker.in",
-        "hive.metastore.runworker.in", "hs2", new StringSetValidator("metastore", "hs2"),
-        "Deprecated. HMS side compaction workers doesn't support pooling. With the concept of compaction " +
-            "pools (HIVE-26443), running workers on HMS side is still supported but not suggested anymore. " +
-            "This config value will be removed in the future.\n" +
-            "Chooses where the compactor worker threads should run, Only possible values are \"metastore\" and \"hs2\""),
     // Hive values we have copied and use as is
     // These two are used to indicate that we are running tests
     HIVE_IN_TEST("hive.in.test", "hive.in.test", false, "internal usage only, true in test mode"),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index e036837..1d9fbf3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -837,12 +837,9 @@ public void run() {
              .setTType(LeaderElectionContext.TTYPE.HOUSEKEEPING) // housekeeping tasks
              .addListener(new CMClearer(conf))
              .addListener(new StatsUpdaterTask(conf))
-             .addListener(new CompactorTasks(conf, false))
+             .addListener(new CompactorTasks(conf))
              .addListener(new CompactorPMF())
              .addListener(new HouseKeepingTasks(conf, true))
-             .setTType(LeaderElectionContext.TTYPE.WORKER) // compactor worker
-             .addListener(new CompactorTasks(conf, true),
-                 MetastoreConf.getVar(conf, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore"))
              .build();
           if (shutdownHookMgr != null) {
             shutdownHookMgr.addShutdownHook(() -> context.close(), 0);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
index fcc5140..956a5e5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/leader/CompactorTasks.java
@@ -36,16 +36,14 @@
 public class CompactorTasks implements LeaderElection.LeadershipStateListener {
 
   private final Configuration configuration;
-  private final boolean runOnlyWorker;
 
   // each MetaStoreThread runs as a thread
   private Map<MetaStoreThread, AtomicBoolean> metastoreThreadsMap;
 
-  public CompactorTasks(Configuration configuration, boolean runOnlyWorker) {
+  public CompactorTasks(Configuration configuration) {
     // recreate a new configuration
     this.configuration = new Configuration(requireNonNull(configuration,
         "configuration is null"));
-    this.runOnlyWorker = runOnlyWorker;
   }
 
   // Copied from HiveMetaStore
@@ -62,66 +60,41 @@ private MetaStoreThread instantiateThread(String classname) throws Exception {
 
   public List<MetaStoreThread> getCompactorThreads() throws Exception {
     List<MetaStoreThread> compactors = new ArrayList<>();
-    if (!runOnlyWorker) {
-      if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
-        MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
-        compactors.add(initiator);
-      }
-      if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
-        MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
-        compactors.add(cleaner);
-      }
-    } else {
-      boolean runInMetastore = MetastoreConf.getVar(configuration,
-          MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN).equals("metastore");
-      if (runInMetastore) {
-        HiveMetaStore.LOG.warn("Running compaction workers on HMS side is not suggested because compaction pools are not supported in HMS " +
-            "(HIVE-26443). Consider removing the hive.metastore.runworker.in configuration setting, as it will be " +
-            "comletely removed in future releases.");
-        int numWorkers = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
-        for (int i = 0; i < numWorkers; i++) {
-          MetaStoreThread worker = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Worker");
-          compactors.add(worker);
-        }
-      }
+    if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
+      MetaStoreThread initiator = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Initiator");
+      compactors.add(initiator);
+    }
+    if (MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
+      MetaStoreThread cleaner = instantiateThread("org.apache.hadoop.hive.ql.txn.compactor.Cleaner");
+      compactors.add(cleaner);
     }
     return compactors;
   }
 
   private void logCompactionParameters() {
-    if (!runOnlyWorker) {
-      HiveMetaStore.LOG.info("Compaction HMS parameters:");
-      HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}",
-          MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON));
-      HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}",
-          MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON));
-      HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}",
-          MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
-      HiveMetaStore.LOG.info("hive.metastore.runworker.in = {}",
-          MetastoreConf.getVar(configuration, MetastoreConf.ConfVars.HIVE_METASTORE_RUNWORKER_IN));
-      HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}",
-          MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE));
-      HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}",
-          MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED));
-      HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}",
-          MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
-      HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}",
-          MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD));
-      HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression",
-          MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION));
+    HiveMetaStore.LOG.info("Compaction HMS parameters:");
+    HiveMetaStore.LOG.info("metastore.compactor.initiator.on = {}",
+        MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON));
+    HiveMetaStore.LOG.info("metastore.compactor.cleaner.on = {}",
+        MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON));
+    HiveMetaStore.LOG.info("metastore.compactor.worker.threads = {}",
+        MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS));
+    HiveMetaStore.LOG.info("metastore.compactor.history.retention.did.not.initiate = {}",
+        MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE));
+    HiveMetaStore.LOG.info("metastore.compactor.history.retention.failed = {}",
+        MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED));
+    HiveMetaStore.LOG.info("metastore.compactor.history.retention.succeeded = {}",
+        MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED));
+    HiveMetaStore.LOG.info("metastore.compactor.initiator.failed.compacts.threshold = {}",
+        MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD));
+    HiveMetaStore.LOG.info("metastore.compactor.enable.stats.compression",
+        MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_MINOR_STATS_COMPRESSION));
 
-      if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
-        HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered.");
-      }
-      if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
-        HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered.");
-      }
-
-    } else {
-      int numThreads = MetastoreConf.getIntVar(configuration, MetastoreConf.ConfVars.COMPACTOR_WORKER_THREADS);
-      if (numThreads < 1) {
-        HiveMetaStore.LOG.warn("Invalid number of Compactor Worker threads({}) on HMS", numThreads);
-      }
+    if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON)) {
+      HiveMetaStore.LOG.warn("Compactor Initiator is turned Off. Automatic compaction will not be triggered.");
+    }
+    if (!MetastoreConf.getBoolVar(configuration, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON)) {
+      HiveMetaStore.LOG.warn("Compactor Cleaner is turned Off. Automatic compaction cleaner will not be triggered.");
     }
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 91cfe31..393cb94 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -259,33 +259,10 @@ public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
   }
 
   /**
-   * This will take all entries assigned to workers
-   * on a host return them to INITIATED state.  The initiator should use this at start up to
-   * clean entries from any workers that were in the middle of compacting when the metastore
-   * shutdown.  It does not reset entries from worker threads on other hosts as those may still
-   * be working.
-   * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
-   *                 so that like hostname% will match the worker id.
-   */
-  @Override
-  @RetrySemantics.Idempotent
-  public void revokeFromLocalWorkers(String hostname) throws MetaException {
-    jdbcResource.execute(
-        "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = NULL, \"CQ_START\" = NULL," +
-            " \"CQ_STATE\" = :initiatedState WHERE \"CQ_STATE\" = :workingState AND \"CQ_WORKER_ID\" LIKE :hostname",
-        new MapSqlParameterSource()
-            .addValue("initiatedState", Character.toString(INITIATED_STATE), Types.CHAR)
-            .addValue("workingState", Character.toString(WORKING_STATE), Types.CHAR)
-            .addValue("hostname", hostname + "%"),
-        null);
-  }
-
-  /**
    * This call will return all compaction queue
    * entries assigned to a worker but over the timeout back to the initiated state.
    * This should be called by the initiator on start up and occasionally when running to clean up
-   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
-   * first.
+   * after dead threads.
    * @param timeout number of milliseconds since start time that should elapse before a worker is
    *                declared dead.
    */
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 42a80b1..13f32f6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -742,25 +742,10 @@ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedT
   void cleanEmptyAbortedAndCommittedTxns() throws MetaException;
 
   /**
-   * This will take all entries assigned to workers
-   * on a host return them to INITIATED state.  The initiator should use this at start up to
-   * clean entries from any workers that were in the middle of compacting when the metastore
-   * shutdown.  It does not reset entries from worker threads on other hosts as those may still
-   * be working.
-   * @param hostname Name of this host.  It is assumed this prefixes the thread's worker id,
-   *                 so that like hostname% will match the worker id.
-   */
-  @SqlRetry
-  @Transactional(POOL_COMPACTOR)
-  @RetrySemantics.Idempotent
-  void revokeFromLocalWorkers(String hostname) throws MetaException;
-
-  /**
    * This call will return all compaction queue
    * entries assigned to a worker but over the timeout back to the initiated state.
    * This should be called by the initiator on start up and occasionally when running to clean up
-   * after dead threads.  At start up {@link #revokeFromLocalWorkers(String)} should be called
-   * first.
+   * after dead threads.
    * @param timeout number of milliseconds since start time that should elapse before a worker is
    *                declared dead.
    */