HBASE-23345 Table need to replication unless all of cfs are excluded (#881)
Signed-off-by: stack <stack@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 86c829b..aba703c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -392,22 +392,31 @@
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
+ String namespace = table.getNamespaceAsString();
if (replicateAllUserTables) {
- if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
+ // replicate all user tables, but filter by exclude namespaces and table-cfs config
+ if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return false;
}
- if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
- return false;
+ // trap here, must check existence first since HashMap allows null value.
+ if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
+ return true;
}
- return true;
+ Collection<String> cfs = excludeTableCFsMap.get(table);
+ // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+ // otherwise, we may still need to replicate the table but filter out some families.
+ return cfs != null && !cfs.isEmpty();
} else {
- if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
+ // Not replicate all user tables, so filter by namespaces and table-cfs config
+ if (namespaces == null && tableCFsMap == null) {
+ return false;
+ }
+ // First filter by namespaces config
+ // If table's namespace in peer config, all the tables data are applicable for replication
+ if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
- if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
- return true;
- }
- return false;
+ return tableCFsMap != null && tableCFsMap.containsKey(table);
}
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index 881ef45..d67a3f8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,10 +17,17 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -32,6 +39,9 @@
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
+ private static TableName TABLE_A = TableName.valueOf("replication", "testA");
+ private static TableName TABLE_B = TableName.valueOf("replication", "testB");
+
@Test
public void testClassMethodsAreBuilderStyle() {
/* ReplicationPeerConfig should have a builder style setup where setXXX/addXXX methods
@@ -48,4 +58,196 @@
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
}
+
+ @Test
+ public void testNeedToReplicateWithReplicatingAll() {
+ ReplicationPeerConfig peerConfig;
+ ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
+ new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ Set<String> namespaces = new HashSet<>();
+
+ // 1. replication_all flag is true, no namespaces and table-cfs config
+ builder.setReplicateAllUserTables(true);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 2. replicate_all flag is true, and config in excludedTableCfs
+ builder.setExcludeNamespaces(null);
+ // empty map
+ tableCfs = new HashMap<>();
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // table testB
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // table testA
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 3. replicate_all flag is true, and config in excludeNamespaces
+ builder.setExcludeTableCFsMap(null);
+ // empty set
+ namespaces = new HashSet<>();
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace default
+ namespaces = new HashSet<>();
+ namespaces.add("default");
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace replication
+ namespaces = new HashSet<>();
+ namespaces.add("replication");
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
+ // Namespaces config doesn't conflict with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // Namespaces config conflicts with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("default");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ }
+
+ @Test
+ public void testNeedToReplicateWithoutReplicatingAll() {
+ ReplicationPeerConfig peerConfig;
+ ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
+ new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ Set<String> namespaces = new HashSet<>();
+
+ // 1. replication_all flag is false, no namespaces and table-cfs config
+ builder.setReplicateAllUserTables(false);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 2. replicate_all flag is false, and only config table-cfs in peer
+ // empty map
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // table testB
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // table testA
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 3. replication_all flag is false, and only config namespace in peer
+ builder.setTableCFsMap(null);
+ // empty set
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace default
+ namespaces = new HashSet<>();
+ namespaces.add("default");
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace replication
+ namespaces = new HashSet<>();
+ namespaces.add("replication");
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 4. replicate_all flag is false, and config namespaces and table-cfs both
+ // Namespaces config doesn't conflict with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // Namespaces config conflicts with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("default");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ }
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 0d40e99..a786206 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -154,44 +154,6 @@
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
}
- /**
- * Returns whether we should replicate the given table.
- */
- public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
- String namespace = tableName.getNamespaceAsString();
- if (peerConfig.replicateAllUserTables()) {
- // replicate all user tables, but filter by exclude namespaces and table-cfs config
- Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
- if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
- return false;
- }
- Map<TableName, List<String>> excludedTableCFs = peerConfig.getExcludeTableCFsMap();
- // trap here, must check existence first since HashMap allows null value.
- if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
- return true;
- }
- List<String> cfs = excludedTableCFs.get(tableName);
- // if cfs is null or empty then we can make sure that we do not need to replicate this table,
- // otherwise, we may still need to replicate the table but filter out some families.
- return cfs != null && !cfs.isEmpty();
- } else {
- // Not replicate all user tables, so filter by namespaces and table-cfs config
- Set<String> namespaces = peerConfig.getNamespaces();
- Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
-
- if (namespaces == null && tableCFs == null) {
- return false;
- }
-
- // First filter by namespaces config
- // If table's namespace in peer config, all the tables data are applicable for replication
- if (namespaces != null && namespaces.contains(namespace)) {
- return true;
- }
- return tableCFs != null && tableCFs.containsKey(tableName);
- }
- }
-
public static FileSystem getRemoteWALFileSystem(Configuration conf, String remoteWALDir)
throws IOException {
return new Path(remoteWALDir).getFileSystem(conf);
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java
deleted file mode 100644
index f8543fe..0000000
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, SmallTests.class })
-public class TestReplicationUtil {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationUtil.class);
-
- private static TableName TABLE_A = TableName.valueOf("replication", "testA");
- private static TableName TABLE_B = TableName.valueOf("replication", "testB");
-
- @Test
- public void testContainsWithReplicatingAll() {
- ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
-
- // 1. replication_all flag is true, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(true);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 2. replicate_all flag is true, and config in excludedTableCfs
- builder.setExcludeNamespaces(null);
- // empty map
- tableCfs = new HashMap<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 3. replicate_all flag is true, and config in excludeNamespaces
- builder.setExcludeTableCFsMap(null);
- // empty set
- namespaces = new HashSet<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
- // Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- }
-
- @Test
- public void testContainsWithoutReplicatingAll() {
- ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
-
- // 1. replication_all flag is false, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(false);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 2. replicate_all flag is false, and only config table-cfs in peer
- // empty map
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 3. replication_all flag is false, and only config namespace in peer
- builder.setTableCFsMap(null);
- // empty set
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 4. replicate_all flag is false, and config namespaces and table-cfs both
- // Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
index 755e0a3..cdc0db2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/AbstractPeerProcedure.java
@@ -33,7 +33,6 @@
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -118,7 +117,7 @@
continue;
}
TableName tn = td.getTableName();
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index c4df613..007a214 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -30,7 +30,6 @@
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -144,11 +143,11 @@
continue;
}
TableName tn = td.getTableName();
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
- ReplicationUtils.contains(oldPeerConfig, tn)) {
+ oldPeerConfig.needToReplicate(tn)) {
continue;
}
if (needReopen(tsm, tn)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 922bfed..d497c22 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -503,7 +503,7 @@
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
- .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
+ .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
.collect(Collectors.toList());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index 41e740f..188921a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -128,15 +128,15 @@
continue;
}
TableName tn = td.getTableName();
- if (ReplicationUtils.contains(oldPeerConfig, tn)) {
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (oldPeerConfig.needToReplicate(tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
// removed from peer config
for (String encodedRegionName : MetaTableAccessor
.getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
addToList(encodedRegionNames, encodedRegionName, queueStorage);
}
}
- } else if (ReplicationUtils.contains(peerConfig, tn)) {
+ } else if (peerConfig.needToReplicate(tn)) {
// newly added to peer config
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 3a3200a..58705f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -52,7 +52,7 @@
@Override
public Entry filter(Entry entry) {
- if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
+ if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) {
return entry;
} else {
return null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index b7d1c15..bdad31b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -116,7 +116,6 @@
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -3916,8 +3915,8 @@
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
if (peerDescriptions != null && peerDescriptions.size() > 0) {
List<String> peers = peerDescriptions.stream()
- .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
- cleanReplicationBarrierTable))
+ .filter(peerConfig -> peerConfig.getPeerConfig()
+ .needToReplicate(cleanReplicationBarrierTable))
.map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
try {
List<String> batch = new ArrayList<>();
@@ -3986,4 +3985,4 @@
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 7faaefb..10c6f72 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -206,13 +206,11 @@
@Test
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
- ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
+ ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is false, no namespaces and table-cfs config
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(null);
- when(peerConfig.getTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@@ -222,9 +220,8 @@
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>();
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -232,9 +229,8 @@
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -242,9 +238,8 @@
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
@@ -252,9 +247,8 @@
userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
@@ -262,19 +256,17 @@
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<>();
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// namespace default
namespaces.add("default");
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@@ -282,9 +274,8 @@
// namespace ns1
namespaces = new HashSet<>();
namespaces.add("ns1");
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -295,10 +286,9 @@
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@@ -307,10 +297,9 @@
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -319,10 +308,9 @@
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -331,14 +319,14 @@
@Test
public void testNamespaceTableCfWALEntryFilter2() {
ReplicationPeer peer = mock(ReplicationPeer.class);
- ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
+ ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is true
// and no exclude namespaces and no exclude table-cfs config
- when(peerConfig.replicateAllUserTables()).thenReturn(true);
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(true)
+ .setExcludeNamespaces(null)
+ .setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@@ -347,18 +335,16 @@
// 2. replicate_all flag is true, and only config exclude namespaces
// empty set
Set<String> namespaces = new HashSet<String>();
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude namespace default
namespaces.add("default");
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -366,9 +352,8 @@
// exclude namespace ns1
namespaces = new HashSet<String>();
namespaces.add("ns1");
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -376,9 +361,8 @@
// 3. replicate_all flag is true, and only config exclude table-cfs
// empty table-cfs map
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -386,9 +370,8 @@
// exclude table bar
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -396,9 +379,8 @@
// exclude table foo:a
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
@@ -409,9 +391,8 @@
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b), filter.filter(userEntry));
@@ -421,9 +402,8 @@
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));