blob: 1fc7ca614df0284fd19978f5b4af86e5623ab8fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertTrue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Testcase for HBASE-23098
*/
// LargeTest because spins up four clusters.
@Category({ ReplicationTests.class, LargeTests.class })
public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility();
private static final String PEER4_CLUSTER_ID = "peer4";
private static final String PEER4_NS = "ns_peer1";
private static final String PEER4_NS_TABLE = "ns_peer2";
private static final Configuration CONF4 = UTIL4.getConfiguration();
private static final String NS1 = "ns1";
private static final String NS2 = "ns2";
private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
setupConfig(UTIL4, "/4");
TestBulkLoadReplication.setUpBeforeClass();
startFourthCluster();
}
private static void startFourthCluster() throws Exception {
LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
UTIL4.setZkCluster(UTIL1.getZkCluster());
UTIL4.startMiniCluster(NUM_SLAVES1);
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
Connection connection4 = ConnectionFactory.createConnection(CONF4);
try (Admin admin4 = connection4.getAdmin()) {
admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
UTIL4.waitUntilAllRegionsAssigned(tableName);
}
@Before
@Override
public void setUpBase() throws Exception {
/** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
* and this test add the fourth cluster.
* So we have following topology:
* 1
* / \
* 2 4
* /
* 3
*
* The 1 -> 4 has two peers,
* ns_peer1: ns1 -> ns1 (validate this peer hfile-refs)
* ns_peer1 configuration is NAMESPACES => ["ns1"]
*
* ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is
* ns_peer2 configuration is NAMESPACES => ["ns2"],
* TABLE_CFS => { "ns2:t2_syncup" => []}
*
* The 1 -> 2 has one peer, this peer configuration is
* add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
*
*/
super.setUpBase();
// Create tables
TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
Admin admin1 = UTIL1.getAdmin();
admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
admin1.createTable(table1);
admin1.createTable(table2);
Admin admin2 = UTIL2.getAdmin();
admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
admin2.createTable(table1);
admin2.createTable(table2);
Admin admin3 = UTIL3.getAdmin();
admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
admin3.createTable(table1);
admin3.createTable(table2);
Admin admin4 = UTIL4.getAdmin();
admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
admin4.createTable(table1);
admin4.createTable(table2);
/**
* Set ns_peer1 1: ns1 -> 2: ns1
*
* add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
* NAMESPACES => ["ns1"]
*/
Set<String> namespaces = new HashSet<>();
namespaces.add(NS1);
ReplicationPeerConfig rpc4_ns =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
.setReplicateAllUserTables(false).setNamespaces(namespaces).build();
admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
/**
* Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
*
* add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
* NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
*/
Map<TableName, List<String>> tableCFsMap = new HashMap<>();
tableCFsMap.put(NS2_TABLE, null);
ReplicationPeerConfig rpc4_ns_table =
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
.setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
}
@After
@Override
public void tearDownBase() throws Exception {
super.tearDownBase();
TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
.setColumnFamily(
ColumnFamilyDescriptorBuilder.newBuilder(famName)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
Admin admin1 = UTIL1.getAdmin();
admin1.disableTable(table1.getTableName());
admin1.deleteTable(table1.getTableName());
admin1.disableTable(table2.getTableName());
admin1.deleteTable(table2.getTableName());
admin1.deleteNamespace(NS1);
admin1.deleteNamespace(NS2);
Admin admin2 = UTIL2.getAdmin();
admin2.disableTable(table1.getTableName());
admin2.deleteTable(table1.getTableName());
admin2.disableTable(table2.getTableName());
admin2.deleteTable(table2.getTableName());
admin2.deleteNamespace(NS1);
admin2.deleteNamespace(NS2);
Admin admin3 = UTIL3.getAdmin();
admin3.disableTable(table1.getTableName());
admin3.deleteTable(table1.getTableName());
admin3.disableTable(table2.getTableName());
admin3.deleteTable(table2.getTableName());
admin3.deleteNamespace(NS1);
admin3.deleteNamespace(NS2);
Admin admin4 = UTIL4.getAdmin();
admin4.disableTable(table1.getTableName());
admin4.deleteTable(table1.getTableName());
admin4.disableTable(table2.getTableName());
admin4.deleteTable(table2.getTableName());
admin4.deleteNamespace(NS1);
admin4.deleteNamespace(NS2);
UTIL1.getAdmin().removeReplicationPeer(PEER4_NS);
UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE);
}
@Test
@Override
public void testBulkLoadReplicationActiveActive() throws Exception {
Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
// case1: The ns1 tables will be replicate to cluster4
byte[] row = Bytes.toBytes("002_ns_peer");
byte[] value = Bytes.toBytes("v2");
bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
waitForReplication(ns1Table, 1, NB_RETRIES);
assertTableHasValue(ns1Table, row, value);
// case2: The ns2:t2_syncup will be replicate to cluster4
// If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
row = Bytes.toBytes("003_ns_table_peer");
value = Bytes.toBytes("v2");
bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
waitForReplication(ns2Table, 1, NB_RETRIES);
assertTableHasValue(ns2Table, row, value);
// case3: The table test will be replicate to cluster1,cluster2,cluster3
// not replicate to cluster4, because we not set other peer for that tables.
row = Bytes.toBytes("001_nopeer");
value = Bytes.toBytes("v1");
assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
peer2TestTable, peer3TestTable);
assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
// Verify hfile-refs for 1:ns_peer1, expect is empty
MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher);
ZKReplicationQueueStorage replicationQueueStorage =
new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
assertTrue(hfiles.isEmpty());
}
}