IGNITE-14728 Changed IGNITE_PDS_WAL_REBALANCE_THRESHOLD from System property to Distributed property. Fixes #9248

Signed-off-by: Slava Koptilin <slava.koptilin@gmail.com>
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
index e535b24..b114aba 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
@@ -32,6 +32,8 @@
 
 import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_INVALID_ARGUMENTS;
 import static org.apache.ignite.internal.commandline.CommandHandler.EXIT_CODE_OK;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DFLT_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY;
 import static org.apache.ignite.testframework.GridTestUtils.assertContains;
 
 /**
@@ -197,4 +199,49 @@
             )
         );
     }
+
+    /**
+     * Check the set command for property 'history.rebalance.threshold'.
+     */
+    @Test
+    public void testPropertyWalRebalanceThreshold() {
+        assertDistributedPropertyEquals(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+
+        int newVal = DFLT_PDS_WAL_REBALANCE_THRESHOLD * 2;
+
+        assertEquals(
+                EXIT_CODE_OK,
+                execute(
+                        "--property", "set",
+                        "--name", HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY,
+                        "--val", Integer.toString(newVal)
+                )
+        );
+
+        assertDistributedPropertyEquals(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, newVal);
+    }
+
+    /**
+     * Validates that distributed property has specified value across all nodes.
+     *
+     * @param propName Distributed property name.
+     * @param expected Expected property value.
+     * @param <T> Property type.
+     */
+    private <T extends Serializable> void assertDistributedPropertyEquals(String propName, T expected) {
+        for (Ignite ign : G.allGrids()) {
+            IgniteEx ignEx = (IgniteEx) ign;
+
+            if (ign.configuration().isClientMode())
+                continue;
+
+            DistributedChangeableProperty<Serializable> prop =
+                    ignEx.context().distributedConfiguration().property(propName);
+
+            assertEquals(
+                "Validation has failed on the cluster node [name=" + ign.configuration().getIgniteInstanceName(),
+                prop.get(),
+                expected);
+        }
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index e149dbe..7007013 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -1113,7 +1113,9 @@
 
     /**
      * WAL rebalance threshold.
+     * @deprecated use Distributed MetaStorage property {@code historical.rebalance.threshold}.
      */
+    @Deprecated
     @SystemProperty(value = "WAL rebalance threshold", type = Integer.class,
         defaults = "" + DFLT_PDS_WAL_REBALANCE_THRESHOLD)
     public static final String IGNITE_PDS_WAL_REBALANCE_THRESHOLD = "IGNITE_PDS_WAL_REBALANCE_THRESHOLD";
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 4cd7cc8..da71924 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -137,6 +137,8 @@
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher;
 import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
@@ -177,6 +179,8 @@
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.getInteger;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.makeUpdateListener;
+import static org.apache.ignite.internal.cluster.DistributedConfigurationUtils.setDefaultValue;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
@@ -239,15 +243,24 @@
      */
     private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
 
-    /** @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD */
+    /**
+     * @see IgniteSystemProperties#IGNITE_PDS_WAL_REBALANCE_THRESHOLD
+     * @see #HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY
+     */
     public static final int DFLT_PDS_WAL_REBALANCE_THRESHOLD = 500;
 
     /** @see IgniteSystemProperties#IGNITE_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE */
     public static final int DFLT_DEFRAGMENTATION_REGION_SIZE_PERCENTAGE = 60;
 
-    /** */
-    private final int walRebalanceThreshold =
-        getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+    /**
+     * Threshold value to use history or full rebalance for local partition.
+     * Master value contained in {@link #historicalRebalanceThreshold}.
+     */
+    private final int walRebalanceThresholdLegacy =
+            getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, DFLT_PDS_WAL_REBALANCE_THRESHOLD);
+
+    /** WAL rebalance threshold distributed configuration key */
+    public static final String HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY = "historical.rebalance.threshold";
 
     /** Prefer historical rebalance flag. */
     private final boolean preferWalRebalance = getBoolean(IGNITE_PREFER_WAL_REBALANCE);
@@ -357,6 +370,10 @@
     /** Checkpoint frequency deviation. */
     private SimpleDistributedProperty<Integer> cpFreqDeviation;
 
+    /** WAL rebalance threshold. */
+    private final SimpleDistributedProperty<Integer> historicalRebalanceThreshold =
+        new SimpleDistributedProperty<>(HISTORICAL_REBALANCE_THRESHOLD_DMS_KEY, Integer::parseInt);
+
     /**
      * @param ctx Kernal context.
      */
@@ -524,6 +541,8 @@
 
         assert !kernalCtx.clientNode();
 
+        initWalRebalanceThreshold();
+
         if (!kernalCtx.clientNode()) {
             kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
 
@@ -1671,7 +1690,7 @@
                     reservedForExchange = null;
 
                     grpPartsWithCnts.clear();
-                    
+
                     return grpPartsWithCnts;
                 }
 
@@ -1758,7 +1777,8 @@
                 continue;
 
             for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
-                if (locPart.state() == OWNING && (preferWalRebalance() || locPart.fullSize() > walRebalanceThreshold))
+                if (locPart.state() == OWNING && (preferWalRebalance() ||
+                    locPart.fullSize() > historicalRebalanceThreshold.getOrDefault(walRebalanceThresholdLegacy)))
                     res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
             }
         }
@@ -3655,4 +3675,27 @@
             return Collections.unmodifiableMap(partitionRecoveryStates);
         }
     }
+
+    /**
+     * Registers {@link #historicalRebalanceThreshold} property in distributed metastore.
+     */
+    private void initWalRebalanceThreshold() {
+        cctx.kernalContext().internalSubscriptionProcessor().registerDistributedConfigurationListener(
+            new DistributedConfigurationLifecycleListener() {
+                /** {@inheritDoc} */
+                @Override public void onReadyToRegister(DistributedPropertyDispatcher dispatcher) {
+                    String logMsgFmt = "Historical rebalance WAL threshold changed [property=%s, oldVal=%s, newVal=%s]";
+
+                    historicalRebalanceThreshold.addListener(makeUpdateListener(logMsgFmt, log));
+
+                    dispatcher.registerProperties(historicalRebalanceThreshold);
+                }
+
+                /** {@inheritDoc} */
+                @Override public void onReadyToWrite() {
+                    setDefaultValue(historicalRebalanceThreshold, walRebalanceThresholdLegacy, log);
+                }
+            }
+        );
+    }
 }