blob: 07e626b3c84c648cfc142722ce137de912bc2f6c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestSerialReplication extends SerialReplicationTestBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSerialReplication.class);
@Before
public void setUp() throws IOException, StreamLacksCapabilityException {
setupWALWriter();
// add in disable state, so later when enabling it all sources will start push together.
addPeer(false);
}
@Test
public void testRegionMove() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
HRegionServer rs = UTIL.getOtherRegionServer(UTIL.getRSForFirstRegionInTable(tableName));
moveRegion(region, rs);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 100; i < 200; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
checkOrder(200);
}
@Test
public void testRegionSplit() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
UTIL.flush(tableName);
RegionInfo region = UTIL.getAdmin().getRegions(tableName).get(0);
UTIL.getAdmin().splitRegionAsync(region.getEncodedNameAsBytes(), Bytes.toBytes(50)).get(30,
TimeUnit.SECONDS);
UTIL.waitUntilNoRegionsInTransition(30000);
List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
assertEquals(2, regions.size());
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
Map<String, Long> regionsToSeqId = new HashMap<>();
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
Long seqId = regionsToSeqId.get(encodedName);
assertNotNull(
"Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
assertTrue("Sequence id go backwards from " + seqId + " to " +
entry.getKey().getSequenceId() + " for " + encodedName,
entry.getKey().getSequenceId() >= seqId.longValue());
if (count < 100) {
assertEquals(encodedName + " is pushed before parent " + region.getEncodedName(),
region.getEncodedName(), encodedName);
} else {
assertNotEquals(region.getEncodedName(), encodedName);
}
count++;
}
assertEquals(200, count);
}
}
@Test
public void testRegionMerge() throws Exception {
byte[] splitKey = Bytes.toBytes(50);
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build(),
new byte[][] { splitKey });
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
List<RegionInfo> regions = UTIL.getAdmin().getRegions(tableName);
UTIL.getAdmin()
.mergeRegionsAsync(
regions.stream().map(RegionInfo::getEncodedNameAsBytes).toArray(byte[][]::new), false)
.get(30, TimeUnit.SECONDS);
UTIL.waitUntilNoRegionsInTransition(30000);
List<RegionInfo> regionsAfterMerge = UTIL.getAdmin().getRegions(tableName);
assertEquals(1, regionsAfterMerge.size());
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(200);
Map<String, Long> regionsToSeqId = new HashMap<>();
RegionInfo region = regionsAfterMerge.get(0);
regionsToSeqId.put(region.getEncodedName(), -1L);
regions.stream().map(RegionInfo::getEncodedName).forEach(n -> regionsToSeqId.put(n, -1L));
try (WAL.Reader reader =
WALFactory.createReader(UTIL.getTestFileSystem(), logPath, UTIL.getConfiguration())) {
int count = 0;
for (Entry entry;;) {
entry = reader.next();
if (entry == null) {
break;
}
String encodedName = Bytes.toString(entry.getKey().getEncodedRegionName());
Long seqId = regionsToSeqId.get(encodedName);
assertNotNull(
"Unexcepted entry " + entry + ", expected regions " + region + ", or " + regions, seqId);
assertTrue("Sequence id go backwards from " + seqId + " to " +
entry.getKey().getSequenceId() + " for " + encodedName,
entry.getKey().getSequenceId() >= seqId.longValue());
if (count < 100) {
assertNotEquals(
encodedName + " is pushed before parents " +
regions.stream().map(RegionInfo::getEncodedName).collect(Collectors.joining(" and ")),
region.getEncodedName(), encodedName);
} else {
assertEquals(region.getEncodedName(), encodedName);
}
count++;
}
assertEquals(200, count);
}
}
@Test
public void testRemovePeerNothingReplicated() throws Exception {
TableName tableName = createTable();
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemovePeer() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
UTIL.getAdmin().removeReplicationPeer(PEER_ID);
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
@Test
public void testRemoveSerialFlag() throws Exception {
TableName tableName = createTable();
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
enablePeerAndWaitUntilReplicationDone(100);
checkOrder(100);
String encodedRegionName =
UTIL.getMiniHBaseCluster().getRegions(tableName).get(0).getRegionInfo().getEncodedName();
ReplicationQueueStorage queueStorage =
UTIL.getMiniHBaseCluster().getMaster().getReplicationPeerManager().getQueueStorage();
assertTrue(queueStorage.getLastSequenceId(encodedRegionName, PEER_ID) > 0);
ReplicationPeerConfig peerConfig = UTIL.getAdmin().getReplicationPeerConfig(PEER_ID);
UTIL.getAdmin().updateReplicationPeerConfig(PEER_ID,
ReplicationPeerConfig.newBuilder(peerConfig).setSerial(false).build());
// confirm that we delete the last pushed sequence id
assertEquals(HConstants.NO_SEQNUM, queueStorage.getLastSequenceId(encodedRegionName, PEER_ID));
}
}