blob: de399e16d10de4d96f0606c5f2ce2ea8fc2243ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@Category({FlakeyTests.class, LargeTests.class})
public class TestPerTableCFReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestPerTableCFReplication.class);
private static final Logger LOG = LoggerFactory.getLogger(TestPerTableCFReplication.class);
private static Configuration conf1;
private static Configuration conf2;
private static Configuration conf3;
private static HBaseTestingUtility utility1;
private static HBaseTestingUtility utility2;
private static HBaseTestingUtility utility3;
private static final long SLEEP_TIME = 500;
private static final int NB_RETRIES = 100;
private static final TableName tableName = TableName.valueOf("test");
private static final TableName tabAName = TableName.valueOf("TA");
private static final TableName tabBName = TableName.valueOf("TB");
private static final TableName tabCName = TableName.valueOf("TC");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] f1Name = Bytes.toBytes("f1");
private static final byte[] f2Name = Bytes.toBytes("f2");
private static final byte[] f3Name = Bytes.toBytes("f3");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
private static final byte[] val = Bytes.toBytes("myval");
private static TableDescriptor table;
private static TableDescriptor tabA;
private static TableDescriptor tabB;
private static TableDescriptor tabC;
@Rule
public TestName name = new TestName();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf1 = HBaseConfiguration.create();
conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
// smaller block size and capacity to trigger more operations
// and test them
conf1.setInt("hbase.regionserver.hlog.blocksize", 1024*20);
conf1.setInt("replication.source.size.capacity", 1024);
conf1.setLong("replication.source.sleepforretries", 100);
conf1.setInt("hbase.regionserver.maxlogs", 10);
conf1.setLong("hbase.master.logcleaner.ttl", 10);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf1.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.replication.TestMasterReplication$CoprocessorCounter");
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
new ZKWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf3 = new Configuration(conf1);
conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
new ZKWatcher(conf2, "cluster3", null, true);
utility3 = new HBaseTestingUtility(conf3);
utility3.setZkCluster(miniZK);
new ZKWatcher(conf3, "cluster3", null, true);
table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
tabA = TableDescriptorBuilder.newBuilder(tabAName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
tabB = TableDescriptorBuilder.newBuilder(tabBName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
tabC = TableDescriptorBuilder.newBuilder(tabCName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f2Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f3Name)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
utility3.shutdownMiniCluster();
utility2.shutdownMiniCluster();
utility1.shutdownMiniCluster();
}
@Test
public void testParseTableCFsFromConfig() {
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
assertEquals(null, tabCFsMap);
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
assertEquals(null, tabCFsMap);
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" ");
assertEquals(null, tabCFsMap);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
// 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
assertEquals(1, tabCFsMap.size()); // only one table
assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
assertEquals(2, tabCFsMap.get(tableName3).size()); // cf-list contains 2 cf
assertTrue(tabCFsMap.get(tableName3).contains("cf1"));// contains "cf1"
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
// 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
+ ":cf1 ; " + tableName3 + ":cf1,cf3");
// 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey(tableName1));
assertTrue(tabCFsMap.containsKey(tableName2));
assertTrue(tabCFsMap.containsKey(tableName3));
// 3.2 table "tab1" : null cf-list
assertEquals(null, tabCFsMap.get(tableName1));
// 3.3 table "tab2" : cf-list contains a single cf "cf1"
assertEquals(1, tabCFsMap.get(tableName2).size());
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
// 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
assertEquals(2, tabCFsMap.get(tableName3).size());
assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
// still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
// 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey(tableName1));
assertTrue(tabCFsMap.containsKey(tableName2));
assertTrue(tabCFsMap.containsKey(tableName3));
// 4.2 table "tab1" : null cf-list
assertEquals(null, tabCFsMap.get(tableName1));
// 4.3 table "tab2" : cf-list contains a single cf "cf1"
assertEquals(1, tabCFsMap.get(tableName2).size());
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
// 4.4 table "tab3" : cf-list contains "cf1" and "cf3"
assertEquals(2, tabCFsMap.get(tableName3).size());
assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
// 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
// "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
// 5.1 no "tableName1" and "tableName2", only "tableName3"
assertEquals(1, tabCFsMap.size()); // only one table
assertFalse(tabCFsMap.containsKey(tableName1));
assertFalse(tabCFsMap.containsKey(tableName2));
assertTrue(tabCFsMap.containsKey(tableName3));
// 5.2 table "tableName3" : cf-list contains "cf1" and "cf3"
assertEquals(2, tabCFsMap.get(tableName3).size());
assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
}
@Test
public void testTableCFsHelperConverter() {
ReplicationProtos.TableCF[] tableCFs = null;
Map<TableName, List<String>> tabCFsMap = null;
// 1. null or empty string, result should be null
assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
tabCFsMap = new HashMap<>();
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(0, tableCFs.length);
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
tabCFsMap.clear();
tabCFsMap.put(tableName1, null);
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length); // only one table
assertEquals(tableName1.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
assertEquals(0, tableCFs[0].getFamiliesCount());
tabCFsMap.clear();
tabCFsMap.put(tableName2, new ArrayList<>());
tabCFsMap.get(tableName2).add("cf1");
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length); // only one table
assertEquals(tableName2.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
assertEquals(1, tableCFs[0].getFamiliesCount());
assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
tabCFsMap.clear();
tabCFsMap.put(tableName3, new ArrayList<>());
tabCFsMap.get(tableName3).add("cf1");
tabCFsMap.get(tableName3).add("cf3");
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(1, tableCFs.length);
assertEquals(tableName3.toString(),
tableCFs[0].getTableName().getQualifier().toStringUtf8());
assertEquals(2, tableCFs[0].getFamiliesCount());
assertEquals("cf1", tableCFs[0].getFamilies(0).toStringUtf8());
assertEquals("cf3", tableCFs[0].getFamilies(1).toStringUtf8());
tabCFsMap.clear();
tabCFsMap.put(tableName1, null);
tabCFsMap.put(tableName2, new ArrayList<>());
tabCFsMap.get(tableName2).add("cf1");
tabCFsMap.put(tableName3, new ArrayList<>());
tabCFsMap.get(tableName3).add("cf1");
tabCFsMap.get(tableName3).add("cf3");
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
assertEquals(3, tableCFs.length);
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
assertEquals(0,
ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
.getFamiliesCount());
assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
.getFamilies(0).toStringUtf8());
assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
.getFamiliesCount());
assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
.getFamilies(0).toStringUtf8());
assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
.getFamilies(1).toStringUtf8());
tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
assertEquals(3, tabCFsMap.size());
assertTrue(tabCFsMap.containsKey(tableName1));
assertTrue(tabCFsMap.containsKey(tableName2));
assertTrue(tabCFsMap.containsKey(tableName3));
// 3.2 table "tab1" : null cf-list
assertEquals(null, tabCFsMap.get(tableName1));
// 3.3 table "tab2" : cf-list contains a single cf "cf1"
assertEquals(1, tabCFsMap.get(tableName2).size());
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));
// 3.4 table "tab3" : cf-list contains "cf1" and "cf3"
assertEquals(2, tabCFsMap.get(tableName3).size());
assertTrue(tabCFsMap.get(tableName3).contains("cf1"));
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));
}
@Test
public void testPerTableCFReplication() throws Exception {
LOG.info("testPerTableCFReplication");
try (Connection connection1 = ConnectionFactory.createConnection(conf1);
Connection connection2 = ConnectionFactory.createConnection(conf2);
Connection connection3 = ConnectionFactory.createConnection(conf3);
Admin admin1 = connection1.getAdmin();
Admin admin2 = connection2.getAdmin();
Admin admin3 = connection3.getAdmin();
Admin replicationAdmin = connection1.getAdmin()) {
admin1.createTable(tabA);
admin1.createTable(tabB);
admin1.createTable(tabC);
admin2.createTable(tabA);
admin2.createTable(tabB);
admin2.createTable(tabC);
admin3.createTable(tabA);
admin3.createTable(tabB);
admin3.createTable(tabC);
Table htab1A = connection1.getTable(tabAName);
Table htab2A = connection2.getTable(tabAName);
Table htab3A = connection3.getTable(tabAName);
Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName);
Table htab3B = connection3.getTable(tabBName);
Table htab1C = connection1.getTable(tabCName);
Table htab2C = connection2.getTable(tabCName);
Table htab3C = connection3.getTable(tabCName);
// A. add cluster2/cluster3 as peers to cluster1
Map<TableName, List<String>> tableCFs = new HashMap<>();
tableCFs.put(tabCName, null);
tableCFs.put(tabBName, new ArrayList<>());
tableCFs.get(tabBName).add("f1");
tableCFs.get(tabBName).add("f3");
ReplicationPeerConfig rpc2 = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility2.getClusterKey()).setReplicateAllUserTables(false)
.setTableCFsMap(tableCFs).build();
replicationAdmin.addReplicationPeer("2", rpc2);
tableCFs.clear();
tableCFs.put(tabAName, null);
tableCFs.put(tabBName, new ArrayList<>());
tableCFs.get(tabBName).add("f1");
tableCFs.get(tabBName).add("f2");
ReplicationPeerConfig rpc3 = ReplicationPeerConfig.newBuilder()
.setClusterKey(utility3.getClusterKey()).setReplicateAllUserTables(false)
.setTableCFsMap(tableCFs).build();
replicationAdmin.addReplicationPeer("3", rpc3);
// A1. tableA can only replicated to cluster3
putAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
ensureRowNotReplicated(row1, f1Name, htab2A);
deleteAndWaitWithFamily(row1, f1Name, htab1A, htab3A);
putAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
ensureRowNotReplicated(row1, f2Name, htab2A);
deleteAndWaitWithFamily(row1, f2Name, htab1A, htab3A);
putAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
ensureRowNotReplicated(row1, f3Name, htab2A);
deleteAndWaitWithFamily(row1, f3Name, htab1A, htab3A);
// A2. cf 'f1' of tableB can replicated to both cluster2 and cluster3
putAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
deleteAndWaitWithFamily(row1, f1Name, htab1B, htab2B, htab3B);
// cf 'f2' of tableB can only replicated to cluster3
putAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
ensureRowNotReplicated(row1, f2Name, htab2B);
deleteAndWaitWithFamily(row1, f2Name, htab1B, htab3B);
// cf 'f3' of tableB can only replicated to cluster2
putAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
ensureRowNotReplicated(row1, f3Name, htab3B);
deleteAndWaitWithFamily(row1, f3Name, htab1B, htab2B);
// A3. tableC can only replicated to cluster2
putAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
ensureRowNotReplicated(row1, f1Name, htab3C);
deleteAndWaitWithFamily(row1, f1Name, htab1C, htab2C);
putAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
ensureRowNotReplicated(row1, f2Name, htab3C);
deleteAndWaitWithFamily(row1, f2Name, htab1C, htab2C);
putAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
ensureRowNotReplicated(row1, f3Name, htab3C);
deleteAndWaitWithFamily(row1, f3Name, htab1C, htab2C);
// B. change peers' replicable table-cf config
tableCFs.clear();
tableCFs.put(tabAName, new ArrayList<>());
tableCFs.get(tabAName).add("f1");
tableCFs.get(tabAName).add("f2");
tableCFs.put(tabCName, new ArrayList<>());
tableCFs.get(tabCName).add("f2");
tableCFs.get(tabCName).add("f3");
replicationAdmin.updateReplicationPeerConfig("2",
ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("2"))
.setTableCFsMap(tableCFs).build());
tableCFs.clear();
tableCFs.put(tabBName, null);
tableCFs.put(tabCName, new ArrayList<>());
tableCFs.get(tabCName).add("f3");
replicationAdmin.updateReplicationPeerConfig("3",
ReplicationPeerConfig.newBuilder(replicationAdmin.getReplicationPeerConfig("3"))
.setTableCFsMap(tableCFs).build());
// B1. cf 'f1' of tableA can only replicated to cluster2
putAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
ensureRowNotReplicated(row2, f1Name, htab3A);
deleteAndWaitWithFamily(row2, f1Name, htab1A, htab2A);
// cf 'f2' of tableA can only replicated to cluster2
putAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
ensureRowNotReplicated(row2, f2Name, htab3A);
deleteAndWaitWithFamily(row2, f2Name, htab1A, htab2A);
// cf 'f3' of tableA isn't replicable to either cluster2 or cluster3
putAndWaitWithFamily(row2, f3Name, htab1A);
ensureRowNotReplicated(row2, f3Name, htab2A, htab3A);
deleteAndWaitWithFamily(row2, f3Name, htab1A);
// B2. tableB can only replicated to cluster3
putAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
ensureRowNotReplicated(row2, f1Name, htab2B);
deleteAndWaitWithFamily(row2, f1Name, htab1B, htab3B);
putAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
ensureRowNotReplicated(row2, f2Name, htab2B);
deleteAndWaitWithFamily(row2, f2Name, htab1B, htab3B);
putAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
ensureRowNotReplicated(row2, f3Name, htab2B);
deleteAndWaitWithFamily(row2, f3Name, htab1B, htab3B);
// B3. cf 'f1' of tableC non-replicable to either cluster
putAndWaitWithFamily(row2, f1Name, htab1C);
ensureRowNotReplicated(row2, f1Name, htab2C, htab3C);
deleteAndWaitWithFamily(row2, f1Name, htab1C);
// cf 'f2' of tableC can only replicated to cluster2
putAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
ensureRowNotReplicated(row2, f2Name, htab3C);
deleteAndWaitWithFamily(row2, f2Name, htab1C, htab2C);
// cf 'f3' of tableC can replicated to cluster2 and cluster3
putAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
deleteAndWaitWithFamily(row2, f3Name, htab1C, htab2C, htab3C);
}
}
private void ensureRowNotReplicated(byte[] row, byte[] fam, Table... tables) throws IOException {
Get get = new Get(row);
get.addFamily(fam);
for (Table table : tables) {
Result res = table.get(get);
assertEquals(0, res.size());
}
}
private void deleteAndWaitWithFamily(byte[] row, byte[] fam,
Table source, Table... targets)
throws Exception {
Delete del = new Delete(row);
del.addFamily(fam);
source.delete(del);
Get get = new Get(row);
get.addFamily(fam);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
boolean removedFromAll = true;
for (Table target : targets) {
Result res = target.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
removedFromAll = false;
break;
}
}
if (removedFromAll) {
break;
} else {
Thread.sleep(SLEEP_TIME);
}
}
}
private void putAndWaitWithFamily(byte[] row, byte[] fam,
Table source, Table... targets)
throws Exception {
Put put = new Put(row);
put.addColumn(fam, row, val);
source.put(put);
Get get = new Get(row);
get.addFamily(fam);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
boolean replicatedToAll = true;
for (Table target : targets) {
Result res = target.get(get);
if (res.isEmpty()) {
LOG.info("Row not available");
replicatedToAll = false;
break;
} else {
assertEquals(1, res.size());
assertArrayEquals(val, res.value());
}
}
if (replicatedToAll) {
break;
} else {
Thread.sleep(SLEEP_TIME);
}
}
}
}