blob: 2d72618f091b3fecfc7a6eef5f8eec1c9cf8b931 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category
({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery
extends TestReplicationBase {
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
@ClassRule public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class);
@Before
public void setUp() throws IOException, InterruptedException {
cleanUp();
scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL);
replicateCount.set(0);
replicatedEntries.clear();
}
/**
* Waits until there is only one log(the current writing one) in the replication queue
*
* @param numRs number of region servers
*/
private void waitForLogAdvance(int numRs) {
Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
Replication replicationService =
(Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
// We are making sure that there is only one log queue and that is for the
// current WAL of region server
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
if (!currentFile.equals(source.getCurrentPath())
|| source.getQueues().keySet().size() != 1
|| source.getQueues().get(logPrefix).size() != 1) {
return false;
}
}
}
return true;
}
});
}
private void verifyNumberOfLogsInQueue(int numQueues, int numRs) {
Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() {
for (int i = 0; i < numRs; i++) {
Replication replicationService =
(Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
ReplicationSource source = (ReplicationSource) rsi;
String logPrefix = source.getQueues().keySet().stream().findFirst().get();
if (source.getQueues().get(logPrefix).size() != numQueues) {
return false;
}
}
}
return true;
}
});
}
@Test
public void testEmptyWALRecovery() throws Exception {
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
* when we see the empty and handle the EOF exception, we are able to existing the previous
* batch of entries without loosing it. This test also tests the number of batches shipped
*
* @throws Exception throws any exception
*/
@Test
public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// There should be three WALs in queue
// 1. empty WAL
// 2. non empty WAL
// 3. live WAL
//verifyNumberOfLogsInQueue(3, numRs);
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
replicatedEntries.size());
// We expect just one batch of replication which will
// be from when we handle the EOF exception.
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* Test empty WAL along with non empty WALs in the same batch. This test is to make sure
* when we see the empty WAL and handle the EOF exception, we are able to proceed
* with next batch and replicate it properly without missing data.
*
* @throws Exception throws any exception
*/
@Test
public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
WAL wal = null;
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// roll the WAL now
for (int i = 0; i < numRs; i++) {
wal.rollWriter();
}
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs,
replicatedEntries.size());
// We expect just one batch of replication to be shipped which will
// for non empty WAL
Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
/**
* This test make sure we replicate all the enties from the non empty WALs which
* are surrounding the empty WALs
*
* @throws Exception throws exception
*/
@Test
public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception {
// Disable the replication peer to accumulate the non empty WAL followed by empty WAL
hbaseAdmin.disableReplicationPeer(PEER_ID2);
int numOfEntriesToReplicate = 20;
final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
WAL wal = null;
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
injectEmptyWAL(numRs, emptyWalPaths);
// roll the WAL again with some entries
for (int i = 0; i < numRs; i++) {
appendEntriesToWal(numOfEntriesToReplicate, wal);
wal.rollWriter();
}
hbaseAdmin.enableReplicationPeer(PEER_ID2);
// ReplicationSource should advance past the empty wal, or else the test will fail
waitForLogAdvance(numRs);
// Now we should expect numOfEntriesToReplicate entries
// replicated from each region server. This makes sure we didn't loose data
// from any previous batch when we encounter EOF exception for empty file.
Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2,
replicatedEntries.size());
// We expect two batch of replication to be shipped which will
// for non empty WAL
Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get());
verifyNumberOfLogsInQueue(1, numRs);
// we're now writing to the new wal
// if everything works, the source should've stopped reading from the empty wal, and start
// replicating from the new wal
runSimplePutDeleteTest();
rollWalsAndWaitForDeque(numRs);
}
// inject our empty wal into the replication queue, and then roll the original wal, which
// enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException {
for (int i = 0; i < numRs; i++) {
HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}
}
protected WALKeyImpl getWalKeyImpl() {
return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes);
}
// Roll the WAL and wait for it to get deque from the log queue
private void rollWalsAndWaitForDeque(int numRs) throws IOException {
RegionInfo regionInfo =
UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo();
for (int i = 0; i < numRs; i++) {
WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
wal.rollWriter();
}
waitForLogAdvance(numRs);
}
private void appendEntriesToWal(int numEntries, WAL wal) throws IOException {
long txId = -1;
for (int i = 0; i < numEntries; i++) {
byte[] b = Bytes.toBytes(Integer.toString(i));
KeyValue kv = new KeyValue(b, famName, b);
WALEdit edit = new WALEdit();
edit.add(kv);
txId = wal.appendData(info, getWalKeyImpl(), edit);
}
wal.sync(txId);
}
}