blob: 74a24ac1eb623b59ef645143f7595fbdb4d5d8ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ ReplicationTests.class, MediumTests.class })
public class TestZKReplicationQueueStorage {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
private static ZKReplicationQueueStorage STORAGE;
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniZKCluster();
STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
}
@AfterClass
public static void tearDown() throws IOException {
UTIL.shutdownMiniZKCluster();
}
@After
public void tearDownAfterTest() throws ReplicationException, KeeperException, IOException {
for (ServerName serverName : STORAGE.getListOfReplicators()) {
for (String queue : STORAGE.getAllQueues(serverName)) {
STORAGE.removeQueue(serverName, queue);
}
STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
}
for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
STORAGE.removePeerFromHFileRefs(peerId);
}
}
private ServerName getServerName(int i) {
return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
}
@Test
public void testReplicator() throws ReplicationException {
assertTrue(STORAGE.getListOfReplicators().isEmpty());
String queueId = "1";
for (int i = 0; i < 10; i++) {
STORAGE.addWAL(getServerName(i), queueId, "file" + i);
}
List<ServerName> replicators = STORAGE.getListOfReplicators();
assertEquals(10, replicators.size());
for (int i = 0; i < 10; i++) {
assertThat(replicators, hasItems(getServerName(i)));
}
for (int i = 0; i < 5; i++) {
STORAGE.removeQueue(getServerName(i), queueId);
}
for (int i = 0; i < 10; i++) {
STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
}
replicators = STORAGE.getListOfReplicators();
assertEquals(5, replicators.size());
for (int i = 5; i < 10; i++) {
assertThat(replicators, hasItems(getServerName(i)));
}
}
private String getFileName(String base, int i) {
return String.format(base + "-%04d", i);
}
@Test
public void testAddRemoveLog() throws ReplicationException {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
String queue1 = "1";
String queue2 = "2";
for (int i = 0; i < 10; i++) {
STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
}
List<String> queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(2, queueIds.size());
assertThat(queueIds, hasItems("1", "2"));
List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(10, wals1.size());
assertEquals(10, wals2.size());
for (int i = 0; i < 10; i++) {
assertThat(wals1, hasItems(getFileName("file1", i)));
assertThat(wals2, hasItems(getFileName("file2", i)));
}
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
Collections.emptyMap());
STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
Collections.emptyMap());
}
for (int i = 0; i < 10; i++) {
assertEquals((i + 1) * 100,
STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals((i + 1) * 100 + 10,
STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
}
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
} else {
STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
}
}
queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(2, queueIds.size());
assertThat(queueIds, hasItems("1", "2"));
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
assertEquals(5, peer1.getSecond().size());
int i = 1;
for (String wal : peer1.getSecond()) {
assertEquals(getFileName("file1", i), wal);
assertEquals((i + 1) * 100,
STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
i += 2;
}
queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems("2"));
wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(5, wals2.size());
for (i = 0; i < 10; i += 2) {
assertThat(wals2, hasItems(getFileName("file2", i)));
}
queueIds = STORAGE.getAllQueues(serverName2);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems(peer1.getFirst()));
wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
assertEquals(5, wals1.size());
for (i = 1; i < 10; i += 2) {
assertThat(wals1, hasItems(getFileName("file1", i)));
}
Set<String> allWals = STORAGE.getAllWALs();
assertEquals(10, allWals.size());
for (i = 0; i < 10; i++) {
assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
}
}
// For HBASE-12865
@Test
public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
STORAGE.addWAL(serverName1, "1", "file");
int v0 = STORAGE.getQueuesZNodeCversion();
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
STORAGE.claimQueue(serverName1, "1", serverName2);
int v1 = STORAGE.getQueuesZNodeCversion();
// cversion should increase by 1 since a child node is deleted
assertEquals(1, v1 - v0);
}
private ZKReplicationQueueStorage createWithUnstableVersion() throws IOException {
return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
private int called = 0;
private int getLastSeqIdOpIndex = 0;
@Override
protected int getQueuesZNodeCversion() throws KeeperException {
if (called < 4) {
called++;
}
return called;
}
@Override
protected Pair<Long, Integer> getLastSequenceIdWithVersion(String encodedRegionName,
String peerId) throws KeeperException {
Pair<Long, Integer> oldPair = super.getLastSequenceIdWithVersion(encodedRegionName, peerId);
if (getLastSeqIdOpIndex < 100) {
// Let the ZNode version increase.
String path = getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
ZKUtil.createWithParents(zookeeper, path);
ZKUtil.setData(zookeeper, path, ZKUtil.positionToByteArray(100L));
}
getLastSeqIdOpIndex++;
return oldPair;
}
};
}
@Test
public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
ZKReplicationQueueStorage storage = createWithUnstableVersion();
storage.addWAL(getServerName(0), "1", "file");
// This should return eventually when cversion stabilizes
Set<String> allWals = storage.getAllWALs();
assertEquals(1, allWals.size());
assertThat(allWals, hasItems("file"));
}
// For HBASE-14621
@Test
public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
ZKReplicationQueueStorage storage = createWithUnstableVersion();
storage.addPeerToHFileRefs("1");
Path p = new Path("/test");
storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
// This should return eventually when cversion stabilizes
Set<String> allHFileRefs = storage.getAllHFileRefs();
assertEquals(1, allHFileRefs.size());
assertThat(allHFileRefs, hasItems("test"));
}
// For HBASE-20138
@Test
public void testSetWALPositionBadVersion() throws IOException, ReplicationException {
ZKReplicationQueueStorage storage = createWithUnstableVersion();
ServerName serverName1 = ServerName.valueOf("128.0.0.1", 8000, 10000);
assertTrue(storage.getAllQueues(serverName1).isEmpty());
String queue1 = "1";
String fileName = getFileName("file1", 0);
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc6";
storage.addWAL(serverName1, queue1, fileName);
List<String> wals1 = storage.getWALsInQueue(serverName1, queue1);
assertEquals(1, wals1.size());
assertEquals(0, storage.getWALPosition(serverName1, queue1, fileName));
// This should return eventually when data version stabilizes
storage.setWALPosition(serverName1, queue1, fileName, 100,
ImmutableMap.of(encodedRegionName, 120L));
assertEquals(100, storage.getWALPosition(serverName1, queue1, fileName));
assertEquals(120L, storage.getLastSequenceId(encodedRegionName, queue1));
}
@Test
public void testRegionsZNodeLayout() throws Exception {
String peerId = "1";
String encodedRegionName = "31d9792f4435b99d9fb1016f6fbc8dc7";
String expectedPath = "/hbase/replication/regions/31/d9/792f4435b99d9fb1016f6fbc8dc7-" + peerId;
String path = STORAGE.getSerialReplicationRegionPeerNode(encodedRegionName, peerId);
assertEquals(expectedPath, path);
}
@Test
public void testRemoveAllLastPushedSeqIdsForPeer() throws Exception {
String peerId = "1";
String peerIdToDelete = "2";
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
STORAGE.setLastSequenceIds(peerId, ImmutableMap.of(encodedRegionName, (long) i));
STORAGE.setLastSequenceIds(peerIdToDelete, ImmutableMap.of(encodedRegionName, (long) i));
}
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
STORAGE.removeLastSequenceIds(peerIdToDelete);
for (int i = 0; i < 100; i++) {
String encodedRegionName = MD5Hash.getMD5AsHex(Bytes.toBytes(i));
assertEquals(i, STORAGE.getLastSequenceId(encodedRegionName, peerId));
assertEquals(HConstants.NO_SEQNUM,
STORAGE.getLastSequenceId(encodedRegionName, peerIdToDelete));
}
}
}