| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.replication; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.Waiter; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; |
| import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; |
| import org.apache.hadoop.hbase.replication.regionserver.Replication; |
| import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource; |
| import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface; |
| import org.apache.hadoop.hbase.testclassification.LargeTests; |
| import org.apache.hadoop.hbase.testclassification.ReplicationTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALKeyImpl; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| @Category |
| ({ ReplicationTests.class, LargeTests.class }) public class TestReplicationEmptyWALRecovery |
| extends TestReplicationBase { |
| MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); |
| static final RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); |
| NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| |
| @ClassRule public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestReplicationEmptyWALRecovery.class); |
| |
| @Before |
| public void setUp() throws IOException, InterruptedException { |
| cleanUp(); |
| scopes.put(famName, HConstants.REPLICATION_SCOPE_GLOBAL); |
| replicateCount.set(0); |
| replicatedEntries.clear(); |
| } |
| |
| /** |
| * Waits until there is only one log(the current writing one) in the replication queue |
| * |
| * @param numRs number of region servers |
| */ |
| private void waitForLogAdvance(int numRs) { |
| Waiter.waitFor(CONF1, 100000, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| for (int i = 0; i < numRs; i++) { |
| HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); |
| WAL wal = hrs.getWAL(regionInfo); |
| Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName(); |
| Replication replicationService = |
| (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); |
| for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() |
| .getSources()) { |
| ReplicationSource source = (ReplicationSource) rsi; |
| // We are making sure that there is only one log queue and that is for the |
| // current WAL of region server |
| String logPrefix = source.getQueues().keySet().stream().findFirst().get(); |
| if (!currentFile.equals(source.getCurrentPath()) |
| || source.getQueues().keySet().size() != 1 |
| || source.getQueues().get(logPrefix).size() != 1) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| }); |
| } |
| |
| private void verifyNumberOfLogsInQueue(int numQueues, int numRs) { |
| Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() { |
| for (int i = 0; i < numRs; i++) { |
| Replication replicationService = |
| (Replication) UTIL1.getHBaseCluster().getRegionServer(i).getReplicationSourceService(); |
| for (ReplicationSourceInterface rsi : replicationService.getReplicationManager() |
| .getSources()) { |
| ReplicationSource source = (ReplicationSource) rsi; |
| String logPrefix = source.getQueues().keySet().stream().findFirst().get(); |
| if (source.getQueues().get(logPrefix).size() != numQueues) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| }); |
| } |
| |
| @Test |
| public void testEmptyWALRecovery() throws Exception { |
| final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); |
| // for each RS, create an empty wal with same walGroupId |
| final List<Path> emptyWalPaths = new ArrayList<>(); |
| long ts = System.currentTimeMillis(); |
| for (int i = 0; i < numRs; i++) { |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); |
| WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); |
| Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); |
| String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); |
| Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); |
| UTIL1.getTestFileSystem().create(emptyWalPath).close(); |
| emptyWalPaths.add(emptyWalPath); |
| } |
| |
| injectEmptyWAL(numRs, emptyWalPaths); |
| |
| // ReplicationSource should advance past the empty wal, or else the test will fail |
| waitForLogAdvance(numRs); |
| verifyNumberOfLogsInQueue(1, numRs); |
| // we're now writing to the new wal |
| // if everything works, the source should've stopped reading from the empty wal, and start |
| // replicating from the new wal |
| runSimplePutDeleteTest(); |
| rollWalsAndWaitForDeque(numRs); |
| } |
| |
| /** |
| * Test empty WAL along with non empty WALs in the same batch. This test is to make sure |
| * when we see the empty and handle the EOF exception, we are able to existing the previous |
| * batch of entries without loosing it. This test also tests the number of batches shipped |
| * |
| * @throws Exception throws any exception |
| */ |
| @Test |
| public void testReplicationOfEmptyWALFollowingNonEmptyWAL() throws Exception { |
| // Disable the replication peer to accumulate the non empty WAL followed by empty WAL |
| hbaseAdmin.disableReplicationPeer(PEER_ID2); |
| int numOfEntriesToReplicate = 20; |
| |
| final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); |
| // for each RS, create an empty wal with same walGroupId |
| final List<Path> emptyWalPaths = new ArrayList<>(); |
| long ts = System.currentTimeMillis(); |
| for (int i = 0; i < numRs; i++) { |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); |
| WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); |
| Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); |
| |
| appendEntriesToWal(numOfEntriesToReplicate, wal); |
| wal.rollWriter(); |
| String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); |
| Path emptyWalPath = new Path(UTIL1.getDefaultRootDirPath(), walGroupId + "." + ts); |
| UTIL1.getTestFileSystem().create(emptyWalPath).close(); |
| emptyWalPaths.add(emptyWalPath); |
| } |
| |
| injectEmptyWAL(numRs, emptyWalPaths); |
| // There should be three WALs in queue |
| // 1. empty WAL |
| // 2. non empty WAL |
| // 3. live WAL |
| //verifyNumberOfLogsInQueue(3, numRs); |
| hbaseAdmin.enableReplicationPeer(PEER_ID2); |
| // ReplicationSource should advance past the empty wal, or else the test will fail |
| waitForLogAdvance(numRs); |
| |
| // Now we should expect numOfEntriesToReplicate entries |
| // replicated from each region server. This makes sure we didn't loose data |
| // from any previous batch when we encounter EOF exception for empty file. |
| Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, |
| replicatedEntries.size()); |
| |
| // We expect just one batch of replication which will |
| // be from when we handle the EOF exception. |
| Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.intValue()); |
| verifyNumberOfLogsInQueue(1, numRs); |
| // we're now writing to the new wal |
| // if everything works, the source should've stopped reading from the empty wal, and start |
| // replicating from the new wal |
| runSimplePutDeleteTest(); |
| rollWalsAndWaitForDeque(numRs); |
| } |
| |
| /** |
| * Test empty WAL along with non empty WALs in the same batch. This test is to make sure |
| * when we see the empty WAL and handle the EOF exception, we are able to proceed |
| * with next batch and replicate it properly without missing data. |
| * |
| * @throws Exception throws any exception |
| */ |
| @Test |
| public void testReplicationOfEmptyWALFollowedByNonEmptyWAL() throws Exception { |
| // Disable the replication peer to accumulate the non empty WAL followed by empty WAL |
| hbaseAdmin.disableReplicationPeer(PEER_ID2); |
| int numOfEntriesToReplicate = 20; |
| |
| final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); |
| // for each RS, create an empty wal with same walGroupId |
| final List<Path> emptyWalPaths = new ArrayList<>(); |
| |
| long ts = System.currentTimeMillis(); |
| WAL wal = null; |
| for (int i = 0; i < numRs; i++) { |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); |
| wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); |
| Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); |
| appendEntriesToWal(numOfEntriesToReplicate, wal); |
| String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); |
| Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); |
| UTIL1.getTestFileSystem().create(emptyWalPath).close(); |
| emptyWalPaths.add(emptyWalPath); |
| |
| } |
| injectEmptyWAL(numRs, emptyWalPaths); |
| // roll the WAL now |
| for (int i = 0; i < numRs; i++) { |
| wal.rollWriter(); |
| } |
| hbaseAdmin.enableReplicationPeer(PEER_ID2); |
| // ReplicationSource should advance past the empty wal, or else the test will fail |
| waitForLogAdvance(numRs); |
| |
| // Now we should expect numOfEntriesToReplicate entries |
| // replicated from each region server. This makes sure we didn't loose data |
| // from any previous batch when we encounter EOF exception for empty file. |
| Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs, |
| replicatedEntries.size()); |
| |
| // We expect just one batch of replication to be shipped which will |
| // for non empty WAL |
| Assert.assertEquals("Replicated batches are not correct", 1, replicateCount.get()); |
| verifyNumberOfLogsInQueue(1, numRs); |
| // we're now writing to the new wal |
| // if everything works, the source should've stopped reading from the empty wal, and start |
| // replicating from the new wal |
| runSimplePutDeleteTest(); |
| rollWalsAndWaitForDeque(numRs); |
| } |
| |
| /** |
| * This test make sure we replicate all the enties from the non empty WALs which |
| * are surrounding the empty WALs |
| * |
| * @throws Exception throws exception |
| */ |
| @Test |
| public void testReplicationOfEmptyWALSurroundedNonEmptyWAL() throws Exception { |
| // Disable the replication peer to accumulate the non empty WAL followed by empty WAL |
| hbaseAdmin.disableReplicationPeer(PEER_ID2); |
| int numOfEntriesToReplicate = 20; |
| |
| final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size(); |
| // for each RS, create an empty wal with same walGroupId |
| final List<Path> emptyWalPaths = new ArrayList<>(); |
| |
| long ts = System.currentTimeMillis(); |
| WAL wal = null; |
| for (int i = 0; i < numRs; i++) { |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); |
| wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); |
| Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal); |
| appendEntriesToWal(numOfEntriesToReplicate, wal); |
| wal.rollWriter(); |
| String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName()); |
| Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts); |
| UTIL1.getTestFileSystem().create(emptyWalPath).close(); |
| emptyWalPaths.add(emptyWalPath); |
| } |
| injectEmptyWAL(numRs, emptyWalPaths); |
| |
| // roll the WAL again with some entries |
| for (int i = 0; i < numRs; i++) { |
| appendEntriesToWal(numOfEntriesToReplicate, wal); |
| wal.rollWriter(); |
| } |
| |
| hbaseAdmin.enableReplicationPeer(PEER_ID2); |
| // ReplicationSource should advance past the empty wal, or else the test will fail |
| waitForLogAdvance(numRs); |
| |
| // Now we should expect numOfEntriesToReplicate entries |
| // replicated from each region server. This makes sure we didn't loose data |
| // from any previous batch when we encounter EOF exception for empty file. |
| Assert.assertEquals("Replicated entries are not correct", numOfEntriesToReplicate * numRs * 2, |
| replicatedEntries.size()); |
| |
| // We expect two batch of replication to be shipped which will |
| // for non empty WAL |
| Assert.assertEquals("Replicated batches are not correct", 2, replicateCount.get()); |
| verifyNumberOfLogsInQueue(1, numRs); |
| // we're now writing to the new wal |
| // if everything works, the source should've stopped reading from the empty wal, and start |
| // replicating from the new wal |
| runSimplePutDeleteTest(); |
| rollWalsAndWaitForDeque(numRs); |
| } |
| |
| // inject our empty wal into the replication queue, and then roll the original wal, which |
| // enqueues a new wal behind our empty wal. We must roll the wal here as now we use the WAL to |
| // determine if the file being replicated currently is still opened for write, so just inject a |
| // new wal to the replication queue does not mean the previous file is closed. |
| private void injectEmptyWAL(int numRs, List<Path> emptyWalPaths) throws IOException { |
| for (int i = 0; i < numRs; i++) { |
| HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i); |
| Replication replicationService = (Replication) hrs.getReplicationSourceService(); |
| replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i)); |
| replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i)); |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo(); |
| WAL wal = hrs.getWAL(regionInfo); |
| wal.rollWriter(true); |
| } |
| } |
| |
| protected WALKeyImpl getWalKeyImpl() { |
| return new WALKeyImpl(info.getEncodedNameAsBytes(), tableName, 0, mvcc, scopes); |
| } |
| |
| // Roll the WAL and wait for it to get deque from the log queue |
| private void rollWalsAndWaitForDeque(int numRs) throws IOException { |
| RegionInfo regionInfo = |
| UTIL1.getHBaseCluster().getRegions(tableName.getName()).get(0).getRegionInfo(); |
| for (int i = 0; i < numRs; i++) { |
| WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo); |
| wal.rollWriter(); |
| } |
| waitForLogAdvance(numRs); |
| } |
| |
| private void appendEntriesToWal(int numEntries, WAL wal) throws IOException { |
| long txId = -1; |
| for (int i = 0; i < numEntries; i++) { |
| byte[] b = Bytes.toBytes(Integer.toString(i)); |
| KeyValue kv = new KeyValue(b, famName, b); |
| WALEdit edit = new WALEdit(); |
| edit.add(kv); |
| txId = wal.appendData(info, getWalKeyImpl(), edit); |
| } |
| wal.sync(txId); |
| } |
| } |