HBASE-25534 Honor TableDescriptor settings earlier in normalization (#2917)

Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
index 6f939da..eae34c6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizer.java
@@ -20,7 +20,7 @@
 
 import java.util.List;
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -38,7 +38,7 @@
 interface RegionNormalizer extends Configurable {
   /**
    * Set the master service. Must be called before first call to
-   * {@link #computePlansForTable(TableName)}.
+   * {@link #computePlansForTable(TableDescriptor)}.
    * @param masterServices master services to use
    */
   void setMasterServices(MasterServices masterServices);
@@ -46,9 +46,9 @@
   /**
    * Computes a list of normalizer actions to perform on the target table. This is the primary
    * entry-point from the Master driving a normalization activity.
-   * @param table table to normalize
+   * @param tableDescriptor table descriptor for table which needs normalize
    * @return A list of the normalization actions to perform, or an empty list
    *   if there's nothing to do.
    */
-  List<NormalizationPlan> computePlansForTable(TableName table);
+  List<NormalizationPlan> computePlansForTable(TableDescriptor tableDescriptor);
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
index 408317a..59d2f46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/RegionNormalizerWorker.java
@@ -178,8 +178,9 @@
       return Collections.emptyList();
     }
 
+    final TableDescriptor tblDesc;
     try {
-      final TableDescriptor tblDesc = masterServices.getTableDescriptors().get(tableName);
+      tblDesc = masterServices.getTableDescriptors().get(tableName);
       if (tblDesc != null && !tblDesc.isNormalizationEnabled()) {
         LOG.debug("Skipping table {} because normalization is disabled in its table properties.",
           tableName);
@@ -190,7 +191,7 @@
       return Collections.emptyList();
     }
 
-    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tableName);
+    final List<NormalizationPlan> plans = regionNormalizer.computePlansForTable(tblDesc);
     if (CollectionUtils.isEmpty(plans)) {
       LOG.debug("No normalization required for table {}.", tableName);
       return Collections.emptyList();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
index 5245568..39568bb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/normalizer/SimpleRegionNormalizer.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hbase.master.normalizer;
 
 import static org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils.isEmpty;
-import java.io.IOException;
 import java.time.Instant;
 import java.time.Period;
 import java.util.ArrayList;
@@ -27,6 +26,7 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.function.BooleanSupplier;
+import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.RegionMetrics;
@@ -184,23 +184,24 @@
   }
 
   @Override
-  public List<NormalizationPlan> computePlansForTable(final TableName table) {
-    if (table == null) {
+  public List<NormalizationPlan> computePlansForTable(final TableDescriptor tableDescriptor) {
+    if (tableDescriptor == null) {
       return Collections.emptyList();
     }
+    TableName table = tableDescriptor.getTableName();
     if (table.isSystemTable()) {
       LOG.debug("Normalization of system table {} isn't allowed", table);
       return Collections.emptyList();
     }
 
-    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning();
-    final boolean proceedWithMergePlanning = proceedWithMergePlanning();
+    final boolean proceedWithSplitPlanning = proceedWithSplitPlanning(tableDescriptor);
+    final boolean proceedWithMergePlanning = proceedWithMergePlanning(tableDescriptor);
     if (!proceedWithMergePlanning && !proceedWithSplitPlanning) {
       LOG.debug("Both split and merge are disabled. Skipping normalization of table: {}", table);
       return Collections.emptyList();
     }
 
-    final NormalizeContext ctx = new NormalizeContext(table);
+    final NormalizeContext ctx = new NormalizeContext(tableDescriptor);
     if (isEmpty(ctx.getTableRegions())) {
       return Collections.emptyList();
     }
@@ -254,41 +255,38 @@
     return masterServices.isSplitOrMergeEnabled(masterSwitchType);
   }
 
-  private boolean proceedWithSplitPlanning() {
-    return isSplitEnabled() && isMasterSwitchEnabled(MasterSwitchType.SPLIT);
+  private boolean proceedWithSplitPlanning(TableDescriptor tableDescriptor) {
+    String value = tableDescriptor.getValue(SPLIT_ENABLED_KEY);
+    return  (value == null ? isSplitEnabled() : Boolean.parseBoolean(value)) &&
+      isMasterSwitchEnabled(MasterSwitchType.SPLIT);
   }
 
-  private boolean proceedWithMergePlanning() {
-    return isMergeEnabled() && isMasterSwitchEnabled(MasterSwitchType.MERGE);
+  private boolean proceedWithMergePlanning(TableDescriptor tableDescriptor) {
+    String value = tableDescriptor.getValue(MERGE_ENABLED_KEY);
+    return (value == null ? isMergeEnabled() : Boolean.parseBoolean(value)) &&
+      isMasterSwitchEnabled(MasterSwitchType.MERGE);
   }
 
   /**
    * @param tableRegions regions of table to normalize
+   * @param tableDescriptor the TableDescriptor
    * @return average region size depending on
    * @see org.apache.hadoop.hbase.client.TableDescriptor#getNormalizerTargetRegionCount()
    * Also make sure tableRegions contains regions of the same table
    */
-  private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions) {
+  private double getAverageRegionSizeMb(final List<RegionInfo> tableRegions,
+    final TableDescriptor tableDescriptor) {
     if (isEmpty(tableRegions)) {
       throw new IllegalStateException(
         "Cannot calculate average size of a table without any regions.");
     }
-    TableName table = tableRegions.get(0).getTable();
-    int targetRegionCount = -1;
-    long targetRegionSize = -1;
+    TableName table = tableDescriptor.getTableName();
     double avgRegionSize;
-    try {
-      TableDescriptor tableDescriptor = masterServices.getTableDescriptors().get(table);
-      if (tableDescriptor != null) {
-        targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
-        targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
-        LOG.debug("Table {} configured with target region count {}, target region size {}", table,
-          targetRegionCount, targetRegionSize);
-      }
-    } catch (IOException e) {
-      LOG.warn("TableDescriptor for {} unavailable, table-level target region count and size"
-        + " configurations cannot be considered.", table, e);
-    }
+    int targetRegionCount = tableDescriptor.getNormalizerTargetRegionCount();
+    long targetRegionSize = tableDescriptor.getNormalizerTargetRegionSize();
+    LOG.debug("Table {} configured with target region count {}, target region size {}", table,
+      targetRegionCount, targetRegionSize);
+
     if (targetRegionSize > 0) {
       avgRegionSize = targetRegionSize;
     } else {
@@ -316,10 +314,10 @@
    */
   private boolean skipForMerge(
     final NormalizerConfiguration normalizerConfiguration,
-    final RegionStates regionStates,
+    final NormalizeContext ctx,
     final RegionInfo regionInfo
   ) {
-    final RegionState state = regionStates.getRegionState(regionInfo);
+    final RegionState state = ctx.getRegionStates().getRegionState(regionInfo);
     final String name = regionInfo.getEncodedName();
     return
       logTraceReason(
@@ -329,10 +327,10 @@
           () -> !Objects.equals(state.getState(), RegionState.State.OPEN),
           "skipping merge of region {} because it is not open.", name)
         || logTraceReason(
-          () -> !isOldEnoughForMerge(normalizerConfiguration, regionInfo),
+          () -> !isOldEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
           "skipping merge of region {} because it is not old enough.", name)
         || logTraceReason(
-          () -> !isLargeEnoughForMerge(normalizerConfiguration, regionInfo),
+          () -> !isLargeEnoughForMerge(normalizerConfiguration, ctx, regionInfo),
           "skipping merge region {} because it is not large enough.", name);
   }
 
@@ -342,7 +340,7 @@
    */
   private List<NormalizationPlan> computeMergeNormalizationPlans(final NormalizeContext ctx) {
     final NormalizerConfiguration configuration = normalizerConfiguration;
-    if (ctx.getTableRegions().size() < configuration.getMinRegionCount()) {
+    if (ctx.getTableRegions().size() < configuration.getMinRegionCount(ctx)) {
       LOG.debug("Table {} has {} regions, required min number of regions for normalizer to run"
           + " is {}, not computing merge plans.", ctx.getTableName(),
         ctx.getTableRegions().size(), configuration.getMinRegionCount());
@@ -350,7 +348,7 @@
     }
 
     final long avgRegionSizeMb = (long) ctx.getAverageRegionSizeMb();
-    if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb()) {
+    if (avgRegionSizeMb < configuration.getMergeMinRegionSizeMb(ctx)) {
       return Collections.emptyList();
     }
     LOG.debug("Computing normalization plan for table {}. average region size: {}, number of"
@@ -373,7 +371,7 @@
       for (current = rangeStart; current < ctx.getTableRegions().size(); current++) {
         final RegionInfo regionInfo = ctx.getTableRegions().get(current);
         final long regionSizeMb = getRegionSizeMB(regionInfo);
-        if (skipForMerge(configuration, ctx.getRegionStates(), regionInfo)) {
+        if (skipForMerge(configuration, ctx, regionInfo)) {
           // this region cannot participate in a range. resume the outer loop.
           rangeStart = Math.max(current, rangeStart + 1);
           break;
@@ -451,12 +449,13 @@
    */
   private static boolean isOldEnoughForMerge(
     final NormalizerConfiguration normalizerConfiguration,
+    final NormalizeContext ctx,
     final RegionInfo regionInfo
   ) {
     final Instant currentTime = Instant.ofEpochMilli(EnvironmentEdgeManager.currentTime());
     final Instant regionCreateTime = Instant.ofEpochMilli(regionInfo.getRegionId());
     return currentTime.isAfter(
-      regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge()));
+      regionCreateTime.plus(normalizerConfiguration.getMergeMinRegionAge(ctx)));
   }
 
   /**
@@ -468,9 +467,10 @@
    */
   private boolean isLargeEnoughForMerge(
     final NormalizerConfiguration normalizerConfiguration,
+    final NormalizeContext ctx,
     final RegionInfo regionInfo
   ) {
-    return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb();
+    return getRegionSizeMB(regionInfo) >= normalizerConfiguration.getMergeMinRegionSizeMb(ctx);
   }
 
   private static boolean logTraceReason(final BooleanSupplier predicate, final String fmtWhenTrue,
@@ -541,18 +541,44 @@
       return minRegionCount;
     }
 
+    public int getMinRegionCount(NormalizeContext context) {
+      int minRegionCount = context.getOrDefault(MIN_REGION_COUNT_KEY, Integer::parseInt, 0);
+      if (minRegionCount <= 0) {
+        minRegionCount = getMinRegionCount();
+      }
+      return minRegionCount;
+    }
+
     public Period getMergeMinRegionAge() {
       return mergeMinRegionAge;
     }
 
+    public Period getMergeMinRegionAge(NormalizeContext context) {
+      int mergeMinRegionAge = context.getOrDefault(MERGE_MIN_REGION_AGE_DAYS_KEY,
+        Integer::parseInt, -1);
+      if (mergeMinRegionAge < 0) {
+        return getMergeMinRegionAge();
+      }
+      return Period.ofDays(mergeMinRegionAge);
+    }
+
     public long getMergeMinRegionSizeMb() {
       return mergeMinRegionSizeMb;
     }
+
+    public long getMergeMinRegionSizeMb(NormalizeContext context) {
+      long mergeMinRegionSizeMb = context.getOrDefault(MERGE_MIN_REGION_SIZE_MB_KEY,
+        Long::parseLong, (long)-1);
+      if (mergeMinRegionSizeMb < 0) {
+        mergeMinRegionSizeMb = getMergeMinRegionSizeMb();
+      }
+      return mergeMinRegionSizeMb;
+    }
   }
 
   /**
    * Inner class caries the state necessary to perform a single invocation of
-   * {@link #computePlansForTable(TableName)}. Grabbing this data from the assignment manager
+   * {@link #computePlansForTable(TableDescriptor)}. Grabbing this data from the assignment manager
    * up-front allows any computed values to be realized just once.
    */
   private class NormalizeContext {
@@ -560,9 +586,11 @@
     private final RegionStates regionStates;
     private final List<RegionInfo> tableRegions;
     private final double averageRegionSizeMb;
+    private final TableDescriptor tableDescriptor;
 
-    public NormalizeContext(final TableName tableName) {
-      this.tableName = tableName;
+    public NormalizeContext(final TableDescriptor tableDescriptor) {
+      this.tableDescriptor = tableDescriptor;
+      tableName = tableDescriptor.getTableName();
       regionStates = SimpleRegionNormalizer.this.masterServices
         .getAssignmentManager()
         .getRegionStates();
@@ -574,7 +602,8 @@
       // In order to avoid that, sort the list by RegionInfo.COMPARATOR.
       // See HBASE-24376
       tableRegions.sort(RegionInfo.COMPARATOR);
-      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions);
+      averageRegionSizeMb = SimpleRegionNormalizer.this.getAverageRegionSizeMb(this.tableRegions,
+        this.tableDescriptor);
     }
 
     public TableName getTableName() {
@@ -592,5 +621,14 @@
     public double getAverageRegionSizeMb() {
       return averageRegionSizeMb;
     }
+
+    public <T> T getOrDefault(String key, Function<String, T> function, T defaultValue) {
+      String value = tableDescriptor.getValue(key);
+      if (value == null) {
+        return defaultValue;
+      } else {
+        return function.apply(value);
+      }
+    }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
index e3a29b8..ad743e1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestRegionNormalizerWorker.java
@@ -135,7 +135,7 @@
     when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
     when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
       .thenReturn(1L);
-    when(regionNormalizer.computePlansForTable(tn))
+    when(regionNormalizer.computePlansForTable(tnDescriptor))
       .thenReturn(singletonList(new MergeNormalizationPlan.Builder()
         .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 10)
         .addTarget(RegionInfoBuilder.newBuilder(tn).build(), 20)
@@ -160,7 +160,7 @@
     when(masterServices.getTableDescriptors().get(tn)).thenReturn(tnDescriptor);
     when(masterServices.splitRegion(any(), any(), anyLong(), anyLong()))
       .thenReturn(1L);
-    when(regionNormalizer.computePlansForTable(tn))
+    when(regionNormalizer.computePlansForTable(tnDescriptor))
       .thenReturn(singletonList(
         new SplitNormalizationPlan(RegionInfoBuilder.newBuilder(tn).build(), 10)));
 
@@ -192,7 +192,7 @@
       .thenReturn(1L);
     when(masterServices.mergeRegions(any(), anyBoolean(), anyLong(), anyLong()))
       .thenReturn(1L);
-    when(regionNormalizer.computePlansForTable(tn))
+    when(regionNormalizer.computePlansForTable(tnDescriptor))
       .thenReturn(Arrays.asList(
         new SplitNormalizationPlan(splitRegionInfo, 2),
         new MergeNormalizationPlan.Builder()
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
index 70f5a87..2db6834 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/normalizer/TestSimpleRegionNormalizer.java
@@ -54,6 +54,8 @@
 import org.apache.hadoop.hbase.TableNameTestRule;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -80,6 +82,7 @@
   private Configuration conf;
   private SimpleRegionNormalizer normalizer;
   private MasterServices masterServices;
+  private TableDescriptor tableDescriptor;
 
   @Rule
   public TableNameTestRule name = new TableNameTestRule();
@@ -87,16 +90,18 @@
   @Before
   public void before() {
     conf = HBaseConfiguration.create();
+    tableDescriptor = TableDescriptorBuilder.newBuilder(name.getTableName()).build();
   }
 
   @Test
   public void testNoNormalizationForMetaTable() {
     TableName testTable = TableName.META_TABLE_NAME;
+    TableDescriptor testMetaTd = TableDescriptorBuilder.newBuilder(testTable).build();
     List<RegionInfo> RegionInfo = new ArrayList<>();
     Map<byte[], Integer> regionSizes = new HashMap<>();
 
     setupMocksForNormalizer(regionSizes, RegionInfo);
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(testTable);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(testMetaTd);
     assertThat(plans, empty());
   }
 
@@ -107,7 +112,7 @@
     final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 10, 15);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, empty());
   }
 
@@ -119,7 +124,7 @@
       createRegionSizesMap(regionInfos, 10, 15, 8, 10);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, empty());
   }
 
@@ -134,7 +139,7 @@
       .thenReturn(RegionState.createForTesting(null, state));
     assertThat(normalizer.getMinRegionCount(), greaterThanOrEqualTo(regionInfos.size()));
 
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(format("Unexpected plans for RegionState %s", state), plans, empty());
   }
 
@@ -177,7 +182,7 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(1), 5)
         .addTarget(regionInfos.get(2), 5)
@@ -194,7 +199,7 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(4), 2700)
         .addTarget(regionInfos.get(5), 2700)
@@ -209,7 +214,7 @@
       createRegionSizesMap(regionInfos, 15, 5, 16, 15, 5);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, empty());
   }
 
@@ -221,7 +226,7 @@
       createRegionSizesMap(regionInfos, 8, 6, 10, 30);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new SplitNormalizationPlan(regionInfos.get(3), 30)));
   }
 
@@ -234,9 +239,8 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region size is 20
-    when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
-        .thenReturn(20L);
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(20L);
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new SplitNormalizationPlan(regionInfos.get(2), 60),
       new SplitNormalizationPlan(regionInfos.get(3), 80),
       new SplitNormalizationPlan(regionInfos.get(4), 100),
@@ -244,10 +248,9 @@
     ));
 
     // test when target region size is 200
-    when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionSize())
-        .thenReturn(200L);
+    when(tableDescriptor.getNormalizerTargetRegionSize()).thenReturn(200L);
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(
         new MergeNormalizationPlan.Builder()
           .addTarget(regionInfos.get(0), 20)
@@ -266,17 +269,15 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     // test when target region count is 8
-    when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
-        .thenReturn(8);
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    when(tableDescriptor.getNormalizerTargetRegionCount()).thenReturn(8);
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new SplitNormalizationPlan(regionInfos.get(2), 60),
       new SplitNormalizationPlan(regionInfos.get(3), 80)));
 
     // test when target region count is 3
-    when(masterServices.getTableDescriptors().get(any()).getNormalizerTargetRegionCount())
-        .thenReturn(3);
+    when(tableDescriptor.getNormalizerTargetRegionCount()).thenReturn(3);
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 20)
         .addTarget(regionInfos.get(1), 40)
@@ -292,12 +293,37 @@
       createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(instanceOf(SplitNormalizationPlan.class)));
 
     conf.setBoolean(SPLIT_ENABLED_KEY, false);
     setupMocksForNormalizer(regionSizes, regionInfos);
-    assertThat(normalizer.computePlansForTable(tableName), empty());
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
+  }
+
+  @Test
+  public void testHonorsSplitEnabledInTD() {
+    conf.setBoolean(SPLIT_ENABLED_KEY, true);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 5, 5, 20, 5, 5);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      contains(instanceOf(SplitNormalizationPlan.class)));
+
+    // When hbase.normalizer.split.enabled is true in configuration, but false in table descriptor
+    when(tableDescriptor.getValue(SPLIT_ENABLED_KEY)).thenReturn("false");
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
+
+    // When hbase.normalizer.split.enabled is false in configuration, but true in table descriptor
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    when(tableDescriptor.getValue(SPLIT_ENABLED_KEY)).thenReturn("true");
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      contains(instanceOf(SplitNormalizationPlan.class)));
   }
 
   @Test
@@ -309,12 +335,37 @@
       createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(instanceOf(MergeNormalizationPlan.class)));
 
     conf.setBoolean(MERGE_ENABLED_KEY, false);
     setupMocksForNormalizer(regionSizes, regionInfos);
-    assertThat(normalizer.computePlansForTable(tableName), empty());
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
+  }
+
+  @Test
+  public void testHonorsMergeEnabledInTD() {
+    conf.setBoolean(MERGE_ENABLED_KEY, true);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 20, 5, 5, 20, 20);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      contains(instanceOf(MergeNormalizationPlan.class)));
+
+    // When hbase.normalizer.merge.enabled is true in configuration, but false in table descriptor
+    when(tableDescriptor.getValue(MERGE_ENABLED_KEY)).thenReturn("false");
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
+
+    // When hbase.normalizer.merge.enabled is false in configuration, but true in table descriptor
+    conf.setBoolean(MERGE_ENABLED_KEY, false);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    when(tableDescriptor.getValue(MERGE_ENABLED_KEY)).thenReturn("true");
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      contains(instanceOf(MergeNormalizationPlan.class)));
   }
 
   @Test
@@ -328,7 +379,7 @@
     final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
     setupMocksForNormalizer(regionSizes, regionInfos);
 
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, contains(
       new SplitNormalizationPlan(regionInfos.get(2), 10),
       new MergeNormalizationPlan.Builder()
@@ -339,7 +390,31 @@
     // have to call setupMocks again because we don't have dynamic config update on normalizer.
     conf.setInt(MIN_REGION_COUNT_KEY, 4);
     setupMocksForNormalizer(regionSizes, regionInfos);
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
+      new SplitNormalizationPlan(regionInfos.get(2), 10)));
+  }
+
+  @Test
+  public void testHonorsMinimumRegionCountInTD() {
+    conf.setInt(MIN_REGION_COUNT_KEY, 1);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 3);
+    // create a table topology that results in both a merge plan and a split plan. Assert that the
+    // merge is only created when the when the number of table regions is above the region count
+    // threshold, and that the split plan is create in both cases.
+    final Map<byte[], Integer> regionSizes = createRegionSizesMap(regionInfos, 1, 1, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
+    assertThat(plans, contains(
+      new SplitNormalizationPlan(regionInfos.get(2), 10),
+      new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 1)
+        .addTarget(regionInfos.get(1), 1)
+        .build()));
+
+    when(tableDescriptor.getValue(MIN_REGION_COUNT_KEY)).thenReturn("4");
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new SplitNormalizationPlan(regionInfos.get(2), 10)));
   }
 
@@ -353,7 +428,7 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertEquals(Period.ofDays(7), normalizer.getMergeMinRegionAge());
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       everyItem(not(instanceOf(MergeNormalizationPlan.class))));
 
     // have to call setupMocks again because we don't have dynamic config update on normalizer.
@@ -361,12 +436,38 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertEquals(
       Period.ofDays(DEFAULT_MERGE_MIN_REGION_AGE_DAYS), normalizer.getMergeMinRegionAge());
-    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    final List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, not(empty()));
     assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
   }
 
   @Test
+  public void testHonorsMergeMinRegionAgeInTD() {
+    conf.setInt(MERGE_MIN_REGION_AGE_DAYS_KEY, 7);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 4);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 1, 1, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    assertEquals(Period.ofDays(7), normalizer.getMergeMinRegionAge());
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      everyItem(not(instanceOf(MergeNormalizationPlan.class))));
+
+    conf.unset(MERGE_MIN_REGION_AGE_DAYS_KEY);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+    when(tableDescriptor.getValue(MERGE_MIN_REGION_AGE_DAYS_KEY)).thenReturn("-1");
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
+    assertThat(plans, not(empty()));
+    assertThat(plans, everyItem(instanceOf(MergeNormalizationPlan.class)));
+
+    when(tableDescriptor.getValue(MERGE_MIN_REGION_AGE_DAYS_KEY)).thenReturn("5");
+    plans = normalizer.computePlansForTable(tableDescriptor);
+    assertThat(plans, empty());
+    assertThat(plans, everyItem(not(instanceOf(MergeNormalizationPlan.class))));
+  }
+
+  @Test
   public void testHonorsMergeMinRegionSize() {
     conf.setBoolean(SPLIT_ENABLED_KEY, false);
     final TableName tableName = name.getTableName();
@@ -378,7 +479,7 @@
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(1, normalizer.getMergeMinRegionSizeMb());
     assertThat(
-      normalizer.computePlansForTable(tableName),
+      normalizer.computePlansForTable(tableDescriptor),
       contains(new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 1)
         .addTarget(regionInfos.get(1), 2)
@@ -387,7 +488,29 @@
     conf.setInt(MERGE_MIN_REGION_SIZE_MB_KEY, 3);
     setupMocksForNormalizer(regionSizes, regionInfos);
     assertEquals(3, normalizer.getMergeMinRegionSizeMb());
-    assertThat(normalizer.computePlansForTable(tableName), empty());
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
+  }
+
+  @Test
+  public void testHonorsMergeMinRegionSizeInTD() {
+    conf.setBoolean(SPLIT_ENABLED_KEY, false);
+    final TableName tableName = name.getTableName();
+    final List<RegionInfo> regionInfos = createRegionInfos(tableName, 5);
+    final Map<byte[], Integer> regionSizes =
+      createRegionSizesMap(regionInfos, 1, 2, 0, 10, 10);
+    setupMocksForNormalizer(regionSizes, regionInfos);
+
+    assertFalse(normalizer.isSplitEnabled());
+    assertEquals(1, normalizer.getMergeMinRegionSizeMb());
+    assertThat(
+      normalizer.computePlansForTable(tableDescriptor),
+      contains(new MergeNormalizationPlan.Builder()
+        .addTarget(regionInfos.get(0), 1)
+        .addTarget(regionInfos.get(1), 2)
+        .build()));
+
+    when(tableDescriptor.getValue(MERGE_MIN_REGION_SIZE_MB_KEY)).thenReturn("3");
+    assertThat(normalizer.computePlansForTable(tableDescriptor), empty());
   }
 
   @Test
@@ -402,7 +525,7 @@
 
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(0, normalizer.getMergeMinRegionSizeMb());
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 0)
         .addTarget(regionInfos.get(1), 1)
@@ -429,7 +552,7 @@
 
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(0, normalizer.getMergeMinRegionSizeMb());
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 0)
         .addTarget(regionInfos.get(1), 1)
@@ -460,7 +583,7 @@
 
     assertFalse(normalizer.isSplitEnabled());
     assertEquals(0, normalizer.getMergeMinRegionSizeMb());
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, contains(
       new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 0)
@@ -492,7 +615,7 @@
     assertTrue(normalizer.isMergeEnabled());
     assertTrue(normalizer.isSplitEnabled());
     assertEquals(0, normalizer.getMergeMinRegionSizeMb());
-    assertThat(normalizer.computePlansForTable(tableName), contains(
+    assertThat(normalizer.computePlansForTable(tableDescriptor), contains(
       new SplitNormalizationPlan(regionInfos.get(3), 30),
       new MergeNormalizationPlan.Builder()
         .addTarget(regionInfos.get(0), 3)
@@ -528,7 +651,7 @@
     setupMocksForNormalizer(regionSizes, regionInfos);
 
     // Compute the plan, no merge plan returned as they are not adjacent.
-    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableName);
+    List<NormalizationPlan> plans = normalizer.computePlansForTable(tableDescriptor);
     assertThat(plans, empty());
   }
 
@@ -536,6 +659,7 @@
   private void setupMocksForNormalizer(Map<byte[], Integer> regionSizes,
     List<RegionInfo> regionInfoList) {
     masterServices = Mockito.mock(MasterServices.class, RETURNS_DEEP_STUBS);
+    tableDescriptor = Mockito.mock(TableDescriptor.class, RETURNS_DEEP_STUBS);
 
     // for simplicity all regions are assumed to be on one server; doesn't matter to us
     ServerName sn = ServerName.valueOf("localhost", 0, 0L);
@@ -561,6 +685,7 @@
     }
 
     when(masterServices.isSplitOrMergeEnabled(any())).thenReturn(true);
+    when(tableDescriptor.getTableName()).thenReturn(name.getTableName());
 
     normalizer = new SimpleRegionNormalizer();
     normalizer.setConf(conf);