| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.replication.regionserver; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.net.URLEncoder; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.NavigableSet; |
| import java.util.Set; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.stream.Collectors; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.ClusterId; |
| import org.apache.hadoop.hbase.HBaseClassTestRule; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.Server; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.Waiter; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.RegionInfo; |
| import org.apache.hadoop.hbase.client.RegionInfoBuilder; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; |
| import org.apache.hadoop.hbase.regionserver.RegionServerServices; |
| import org.apache.hadoop.hbase.replication.ReplicationFactory; |
| import org.apache.hadoop.hbase.replication.ReplicationPeer; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; |
| import org.apache.hadoop.hbase.replication.ReplicationPeers; |
| import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; |
| import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; |
| import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; |
| import org.apache.hadoop.hbase.replication.ReplicationUtils; |
| import org.apache.hadoop.hbase.replication.SyncReplicationState; |
| import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage; |
| import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; |
| import org.apache.hadoop.hbase.testclassification.MediumTests; |
| import org.apache.hadoop.hbase.testclassification.ReplicationTests; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.CommonFSUtils; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil; |
| import org.apache.hadoop.hbase.util.MockServer; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.wal.WAL; |
| import org.apache.hadoop.hbase.wal.WALEdit; |
| import org.apache.hadoop.hbase.wal.WALFactory; |
| import org.apache.hadoop.hbase.wal.WALKeyImpl; |
| import org.apache.hadoop.hbase.zookeeper.ZKClusterId; |
| import org.apache.hadoop.hbase.zookeeper.ZKUtil; |
| import org.apache.hadoop.hbase.zookeeper.ZKWatcher; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.ClassRule; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hbase.thirdparty.com.google.common.collect.Sets; |
| import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; |
| |
| import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; |
| import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; |
| |
| /** |
| * An abstract class that tests ReplicationSourceManager. Classes that extend this class should |
| * set up the proper config for this class and initialize the proper cluster using |
| * HBaseTestingUtility. |
| */ |
| @Category({ReplicationTests.class, MediumTests.class}) |
| public abstract class TestReplicationSourceManager { |
| |
| @ClassRule |
| public static final HBaseClassTestRule CLASS_RULE = |
| HBaseClassTestRule.forClass(TestReplicationSourceManager.class); |
| |
| protected static final Logger LOG = |
| LoggerFactory.getLogger(TestReplicationSourceManager.class); |
| |
| protected static Configuration conf; |
| |
| protected static HBaseTestingUtility utility; |
| |
| protected static Replication replication; |
| |
| protected static ReplicationSourceManager manager; |
| |
| protected static ReplicationSourceManager managerOfCluster; |
| |
| protected static ZKWatcher zkw; |
| |
| protected static TableDescriptor htd; |
| |
| protected static RegionInfo hri; |
| |
| protected static final byte[] r1 = Bytes.toBytes("r1"); |
| |
| protected static final byte[] r2 = Bytes.toBytes("r2"); |
| |
| protected static final byte[] f1 = Bytes.toBytes("f1"); |
| |
| protected static final byte[] f2 = Bytes.toBytes("f2"); |
| |
| protected static final TableName test = |
| TableName.valueOf("test"); |
| |
| protected static final String slaveId = "1"; |
| |
| protected static FileSystem fs; |
| |
| protected static Path oldLogDir; |
| |
| protected static Path logDir; |
| |
| protected static Path remoteLogDir; |
| |
| protected static CountDownLatch latch; |
| |
| protected static List<String> files = new ArrayList<>(); |
| protected static NavigableMap<byte[], Integer> scopes; |
| |
| protected static void setupZkAndReplication() throws Exception { |
| // The implementing class should set up the conf |
| assertNotNull(conf); |
| zkw = new ZKWatcher(conf, "test", null); |
| ZKUtil.createWithParents(zkw, "/hbase/replication"); |
| ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1"); |
| ZKUtil.setData(zkw, "/hbase/replication/peers/1", |
| Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" |
| + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); |
| ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); |
| ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", |
| ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); |
| ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/sync-rep-state"); |
| ZKUtil.setData(zkw, "/hbase/replication/peers/1/sync-rep-state", |
| ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); |
| ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/new-sync-rep-state"); |
| ZKUtil.setData(zkw, "/hbase/replication/peers/1/new-sync-rep-state", |
| ZKReplicationPeerStorage.NONE_STATE_ZNODE_BYTES); |
| ZKUtil.createWithParents(zkw, "/hbase/replication/state"); |
| ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES); |
| |
| ZKClusterId.setClusterId(zkw, new ClusterId()); |
| CommonFSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); |
| fs = FileSystem.get(conf); |
| oldLogDir = utility.getDataTestDir(HConstants.HREGION_OLDLOGDIR_NAME); |
| logDir = utility.getDataTestDir(HConstants.HREGION_LOGDIR_NAME); |
| remoteLogDir = utility.getDataTestDir(ReplicationUtils.REMOTE_WAL_DIR_NAME); |
| replication = new Replication(); |
| replication.initialize(new DummyServer(), fs, logDir, oldLogDir, |
| new WALFactory(conf, "test", null, false)); |
| managerOfCluster = getManagerFromCluster(); |
| if (managerOfCluster != null) { |
| // After replication procedure, we need to add peer by hand (other than by receiving |
| // notification from zk) |
| managerOfCluster.addPeer(slaveId); |
| } |
| |
| manager = replication.getReplicationManager(); |
| manager.addSource(slaveId); |
| if (managerOfCluster != null) { |
| waitPeer(slaveId, managerOfCluster, true); |
| } |
| waitPeer(slaveId, manager, true); |
| |
| htd = TableDescriptorBuilder.newBuilder(test) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(f1) |
| .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f2)).build(); |
| |
| scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| for(byte[] fam : htd.getColumnFamilyNames()) { |
| scopes.put(fam, 0); |
| } |
| hri = RegionInfoBuilder.newBuilder(htd.getTableName()).setStartKey(r1).setEndKey(r2).build(); |
| } |
| |
| private static ReplicationSourceManager getManagerFromCluster() { |
| // TestReplicationSourceManagerZkImpl won't start the mini hbase cluster. |
| if (utility.getMiniHBaseCluster() == null) { |
| return null; |
| } |
| return utility.getMiniHBaseCluster().getRegionServerThreads() |
| .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) |
| .findAny() |
| .map(RegionServerServices::getReplicationSourceService) |
| .map(r -> (Replication)r) |
| .map(Replication::getReplicationManager) |
| .get(); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| if (manager != null) { |
| manager.join(); |
| } |
| utility.shutdownMiniCluster(); |
| } |
| |
| @Rule |
| public TestName testName = new TestName(); |
| |
| private void cleanLogDir() throws IOException { |
| fs.delete(logDir, true); |
| fs.delete(oldLogDir, true); |
| fs.delete(remoteLogDir, true); |
| } |
| |
| @Before |
| public void setUp() throws Exception { |
| LOG.info("Start " + testName.getMethodName()); |
| cleanLogDir(); |
| } |
| |
| @After |
| public void tearDown() throws Exception { |
| LOG.info("End " + testName.getMethodName()); |
| cleanLogDir(); |
| List<String> ids = manager.getSources().stream() |
| .map(ReplicationSourceInterface::getPeerId).collect(Collectors.toList()); |
| for (String id : ids) { |
| if (slaveId.equals(id)) { |
| continue; |
| } |
| removePeerAndWait(id); |
| } |
| } |
| |
| @Test |
| public void testLogRoll() throws Exception { |
| long baseline = 1000; |
| long time = baseline; |
| MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(); |
| KeyValue kv = new KeyValue(r1, f1, r1); |
| WALEdit edit = new WALEdit(); |
| edit.add(kv); |
| |
| WALFactory wals = |
| new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8")); |
| ReplicationSourceManager replicationManager = replication.getReplicationManager(); |
| wals.getWALProvider() |
| .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager)); |
| final WAL wal = wals.getWAL(hri); |
| manager.init(); |
| TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("tableame")) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(f1)).build(); |
| NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| for(byte[] fam : htd.getColumnFamilyNames()) { |
| scopes.put(fam, 0); |
| } |
| // Testing normal log rolling every 20 |
| for(long i = 1; i < 101; i++) { |
| if(i > 1 && i % 20 == 0) { |
| wal.rollWriter(); |
| } |
| LOG.info(Long.toString(i)); |
| final long txid = wal.appendData(hri, |
| new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), |
| edit); |
| wal.sync(txid); |
| } |
| |
| // Simulate a rapid insert that's followed |
| // by a report that's still not totally complete (missing last one) |
| LOG.info(baseline + " and " + time); |
| baseline += 101; |
| time = baseline; |
| LOG.info(baseline + " and " + time); |
| |
| for (int i = 0; i < 3; i++) { |
| wal.appendData(hri, |
| new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), |
| edit); |
| } |
| wal.sync(); |
| |
| int logNumber = 0; |
| for (Map.Entry<String, NavigableSet<String>> entry : manager.getWALs().get(slaveId) |
| .entrySet()) { |
| logNumber += entry.getValue().size(); |
| } |
| assertEquals(6, logNumber); |
| |
| wal.rollWriter(); |
| |
| ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); |
| when(source.getQueueId()).thenReturn("1"); |
| when(source.isRecovered()).thenReturn(false); |
| when(source.isSyncReplication()).thenReturn(false); |
| manager.logPositionAndCleanOldLogs(source, |
| new WALEntryBatch(0, manager.getSources().get(0).getCurrentPath())); |
| |
| wal.appendData(hri, |
| new WALKeyImpl(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc, scopes), |
| edit); |
| wal.sync(); |
| |
| assertEquals(1, manager.getWALs().size()); |
| |
| |
| // TODO Need a case with only 2 WALs and we only want to delete the first one |
| } |
| |
| @Test |
| public void testClaimQueues() throws Exception { |
| Server server = new DummyServer("hostname0.example.org"); |
| ReplicationQueueStorage rq = ReplicationStorageFactory |
| .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); |
| // populate some znodes in the peer znode |
| files.add("log1"); |
| files.add("log2"); |
| for (String file : files) { |
| rq.addWAL(server.getServerName(), "1", file); |
| } |
| // create 3 DummyServers |
| Server s1 = new DummyServer("dummyserver1.example.org"); |
| Server s2 = new DummyServer("dummyserver2.example.org"); |
| Server s3 = new DummyServer("dummyserver3.example.org"); |
| |
| // create 3 DummyNodeFailoverWorkers |
| DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1); |
| DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2); |
| DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3); |
| |
| latch = new CountDownLatch(3); |
| // start the threads |
| w1.start(); |
| w2.start(); |
| w3.start(); |
| // make sure only one is successful |
| int populatedMap = 0; |
| // wait for result now... till all the workers are done. |
| latch.await(); |
| populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated() |
| + w3.isLogZnodesMapPopulated(); |
| assertEquals(1, populatedMap); |
| server.abort("", null); |
| } |
| |
| @Test |
| public void testCleanupFailoverQueues() throws Exception { |
| Server server = new DummyServer("hostname1.example.org"); |
| ReplicationQueueStorage rq = ReplicationStorageFactory |
| .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); |
| // populate some znodes in the peer znode |
| SortedSet<String> files = new TreeSet<>(); |
| String group = "testgroup"; |
| String file1 = group + "." + EnvironmentEdgeManager.currentTime() + ".log1"; |
| String file2 = group + "." + EnvironmentEdgeManager.currentTime() + ".log2"; |
| files.add(file1); |
| files.add(file2); |
| for (String file : files) { |
| rq.addWAL(server.getServerName(), "1", file); |
| } |
| Server s1 = new DummyServer("dummyserver1.example.org"); |
| ReplicationPeers rp1 = |
| ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration()); |
| rp1.init(); |
| NodeFailoverWorker w1 = |
| manager.new NodeFailoverWorker(server.getServerName()); |
| w1.run(); |
| assertEquals(1, manager.getWalsByIdRecoveredQueues().size()); |
| String id = "1-" + server.getServerName().getServerName(); |
| assertEquals(files, manager.getWalsByIdRecoveredQueues().get(id).get(group)); |
| ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); |
| when(source.getQueueId()).thenReturn(id); |
| when(source.isRecovered()).thenReturn(true); |
| when(source.isSyncReplication()).thenReturn(false); |
| manager.cleanOldLogs(file2, false, source); |
| // log1 should be deleted |
| assertEquals(Sets.newHashSet(file2), manager.getWalsByIdRecoveredQueues().get(id).get(group)); |
| } |
| |
| @Test |
| public void testCleanupUnknownPeerZNode() throws Exception { |
| Server server = new DummyServer("hostname2.example.org"); |
| ReplicationQueueStorage rq = ReplicationStorageFactory |
| .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); |
| // populate some znodes in the peer znode |
| // add log to an unknown peer |
| String group = "testgroup"; |
| rq.addWAL(server.getServerName(), "2", group + ".log1"); |
| rq.addWAL(server.getServerName(), "2", group + ".log2"); |
| |
| NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName()); |
| w1.run(); |
| |
| // The log of the unknown peer should be removed from zk |
| for (String peer : manager.getAllQueues()) { |
| assertTrue(peer.startsWith("1")); |
| } |
| } |
| |
| /** |
| * Test for HBASE-9038, Replication.scopeWALEdits would NPE if it wasn't filtering out the |
| * compaction WALEdit. |
| */ |
| @Test |
| public void testCompactionWALEdits() throws Exception { |
| TableName tableName = TableName.valueOf("testCompactionWALEdits"); |
| WALProtos.CompactionDescriptor compactionDescriptor = |
| WALProtos.CompactionDescriptor.getDefaultInstance(); |
| RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW) |
| .setEndKey(HConstants.EMPTY_END_ROW).build(); |
| WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor); |
| ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf); |
| } |
| |
| @Test |
| public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception { |
| NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| // 1. Get the bulk load wal edit event |
| WALEdit logEdit = getBulkLoadWALEdit(scope); |
| // 2. Create wal key |
| WALKeyImpl logKey = new WALKeyImpl(scope); |
| |
| // 3. Get the scopes for the key |
| ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf); |
| |
| // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled |
| assertNull("No bulk load entries scope should be added if bulk load replication is disabled.", |
| logKey.getReplicationScopes()); |
| } |
| |
| @Test |
| public void testBulkLoadWALEdits() throws Exception { |
| // 1. Get the bulk load wal edit event |
| NavigableMap<byte[], Integer> scope = new TreeMap<>(Bytes.BYTES_COMPARATOR); |
| WALEdit logEdit = getBulkLoadWALEdit(scope); |
| // 2. Create wal key |
| WALKeyImpl logKey = new WALKeyImpl(scope); |
| // 3. Enable bulk load hfile replication |
| Configuration bulkLoadConf = HBaseConfiguration.create(conf); |
| bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true); |
| |
| // 4. Get the scopes for the key |
| ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf); |
| |
| NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes(); |
| // Assert family with replication scope global is present in the key scopes |
| assertTrue("This family scope is set to global, should be part of replication key scopes.", |
| scopes.containsKey(f1)); |
| // Assert family with replication scope local is not present in the key scopes |
| assertFalse("This family scope is set to local, should not be part of replication key scopes", |
| scopes.containsKey(f2)); |
| } |
| |
| /** |
| * Test whether calling removePeer() on a ReplicationSourceManager that failed on initializing the |
| * corresponding ReplicationSourceInterface correctly cleans up the corresponding |
| * replication queue and ReplicationPeer. |
| * See HBASE-16096. |
| */ |
| @Test |
| public void testPeerRemovalCleanup() throws Exception{ |
| String replicationSourceImplName = conf.get("replication.replicationsource.implementation"); |
| final String peerId = "FakePeer"; |
| final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() |
| .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); |
| try { |
| DummyServer server = new DummyServer(); |
| ReplicationQueueStorage rq = ReplicationStorageFactory |
| .getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration()); |
| // Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface |
| // initialization to throw an exception. |
| conf.set("replication.replicationsource.implementation", |
| FailInitializeDummyReplicationSource.class.getName()); |
| manager.getReplicationPeers(); |
| // Set up the znode and ReplicationPeer for the fake peer |
| // Don't wait for replication source to initialize, we know it won't. |
| addPeerAndWait(peerId, peerConfig, false); |
| |
| // Sanity check |
| assertNull(manager.getSource(peerId)); |
| |
| // Create a replication queue for the fake peer |
| rq.addWAL(server.getServerName(), peerId, "FakeFile"); |
| // Unregister peer, this should remove the peer and clear all queues associated with it |
| // Need to wait for the ReplicationTracker to pick up the changes and notify listeners. |
| removePeerAndWait(peerId); |
| assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId)); |
| } finally { |
| conf.set("replication.replicationsource.implementation", replicationSourceImplName); |
| removePeerAndWait(peerId); |
| } |
| } |
| |
| private static MetricsReplicationSourceSource getGlobalSource() throws Exception { |
| ReplicationSourceInterface source = manager.getSource(slaveId); |
| // Retrieve the global replication metrics source |
| Field f = MetricsSource.class.getDeclaredField("globalSourceSource"); |
| f.setAccessible(true); |
| return (MetricsReplicationSourceSource)f.get(source.getSourceMetrics()); |
| } |
| |
| private static long getSizeOfLatestPath() { |
| // If no mini cluster is running, there are extra replication manager influencing the metrics. |
| if (utility.getMiniHBaseCluster() == null) { |
| return 0; |
| } |
| return utility.getMiniHBaseCluster().getRegionServerThreads() |
| .stream().map(JVMClusterUtil.RegionServerThread::getRegionServer) |
| .map(RegionServerServices::getReplicationSourceService) |
| .map(r -> (Replication)r) |
| .map(Replication::getReplicationManager) |
| .mapToLong(ReplicationSourceManager::getSizeOfLatestPath) |
| .sum(); |
| } |
| |
| @Test |
| public void testRemovePeerMetricsCleanup() throws Exception { |
| final String peerId = "DummyPeer"; |
| final ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder() |
| .setClusterKey(utility.getZkCluster().getAddress().toString() + ":/hbase").build(); |
| try { |
| MetricsReplicationSourceSource globalSource = getGlobalSource(); |
| final int globalLogQueueSizeInitial = globalSource.getSizeOfLogQueue(); |
| final long sizeOfLatestPath = getSizeOfLatestPath(); |
| addPeerAndWait(peerId, peerConfig, true); |
| assertEquals(sizeOfLatestPath + globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); |
| ReplicationSourceInterface source = manager.getSource(peerId); |
| // Sanity check |
| assertNotNull(source); |
| final int sizeOfSingleLogQueue = source.getSourceMetrics().getSizeOfLogQueue(); |
| // Enqueue log and check if metrics updated |
| source.enqueueLog(new Path("abc")); |
| assertEquals(1 + sizeOfSingleLogQueue, source.getSourceMetrics().getSizeOfLogQueue()); |
| assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, |
| globalSource.getSizeOfLogQueue()); |
| |
| // Removing the peer should reset the global metrics |
| removePeerAndWait(peerId); |
| assertEquals(globalLogQueueSizeInitial, globalSource.getSizeOfLogQueue()); |
| |
| // Adding the same peer back again should reset the single source metrics |
| addPeerAndWait(peerId, peerConfig, true); |
| source = manager.getSource(peerId); |
| assertNotNull(source); |
| assertEquals(source.getSourceMetrics().getSizeOfLogQueue() + globalLogQueueSizeInitial, |
| globalSource.getSizeOfLogQueue()); |
| } finally { |
| removePeerAndWait(peerId); |
| } |
| } |
| |
| private ReplicationSourceInterface mockReplicationSource(String peerId) { |
| ReplicationSourceInterface source = mock(ReplicationSourceInterface.class); |
| when(source.getPeerId()).thenReturn(peerId); |
| when(source.getQueueId()).thenReturn(peerId); |
| when(source.isRecovered()).thenReturn(false); |
| when(source.isSyncReplication()).thenReturn(true); |
| ReplicationPeerConfig config = mock(ReplicationPeerConfig.class); |
| when(config.getRemoteWALDir()) |
| .thenReturn(remoteLogDir.makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString()); |
| ReplicationPeer peer = mock(ReplicationPeer.class); |
| when(peer.getPeerConfig()).thenReturn(config); |
| when(source.getPeer()).thenReturn(peer); |
| return source; |
| } |
| |
| @Test |
| public void testRemoveRemoteWALs() throws Exception { |
| String peerId2 = slaveId + "_2"; |
| addPeerAndWait(peerId2, |
| ReplicationPeerConfig.newBuilder() |
| .setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase").build(), |
| true); |
| try { |
| // make sure that we can deal with files which does not exist |
| String walNameNotExists = |
| "remoteWAL-12345-" + slaveId + ".12345" + ReplicationUtils.SYNC_WAL_SUFFIX; |
| Path wal = new Path(logDir, walNameNotExists); |
| manager.preLogRoll(wal); |
| manager.postLogRoll(wal); |
| |
| Path remoteLogDirForPeer = new Path(remoteLogDir, slaveId); |
| fs.mkdirs(remoteLogDirForPeer); |
| String walName = |
| "remoteWAL-12345-" + slaveId + ".23456" + ReplicationUtils.SYNC_WAL_SUFFIX; |
| Path remoteWAL = |
| new Path(remoteLogDirForPeer, walName).makeQualified(fs.getUri(), fs.getWorkingDirectory()); |
| fs.create(remoteWAL).close(); |
| wal = new Path(logDir, walName); |
| manager.preLogRoll(wal); |
| manager.postLogRoll(wal); |
| |
| ReplicationSourceInterface source = mockReplicationSource(peerId2); |
| manager.cleanOldLogs(walName, true, source); |
| // still there if peer id does not match |
| assertTrue(fs.exists(remoteWAL)); |
| |
| source = mockReplicationSource(slaveId); |
| manager.cleanOldLogs(walName, true, source); |
| assertFalse(fs.exists(remoteWAL)); |
| } finally { |
| removePeerAndWait(peerId2); |
| } |
| } |
| |
| @Test |
| public void testSameWALPrefix() throws IOException { |
| Set<String> latestWalsBefore = |
| manager.getLastestPath().stream().map(Path::getName).collect(Collectors.toSet()); |
| String walName1 = "localhost,8080,12345-45678-Peer.34567"; |
| String walName2 = "localhost,8080,12345.56789"; |
| manager.preLogRoll(new Path(walName1)); |
| manager.preLogRoll(new Path(walName2)); |
| |
| Set<String> latestWals = manager.getLastestPath().stream().map(Path::getName) |
| .filter(n -> !latestWalsBefore.contains(n)).collect(Collectors.toSet()); |
| assertEquals(2, latestWals.size()); |
| assertTrue(latestWals.contains(walName1)); |
| assertTrue(latestWals.contains(walName2)); |
| } |
| |
| /** |
| * Add a peer and wait for it to initialize |
| * @param waitForSource Whether to wait for replication source to initialize |
| */ |
| private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig, |
| final boolean waitForSource) throws Exception { |
| final ReplicationPeers rp = manager.getReplicationPeers(); |
| rp.getPeerStorage().addPeer(peerId, peerConfig, true, SyncReplicationState.NONE); |
| try { |
| manager.addPeer(peerId); |
| } catch (Exception e) { |
| // ignore the failed exception, because we'll test both success & failed case. |
| } |
| waitPeer(peerId, manager, waitForSource); |
| if (managerOfCluster != null) { |
| managerOfCluster.addPeer(peerId); |
| waitPeer(peerId, managerOfCluster, waitForSource); |
| } |
| } |
| |
| private static void waitPeer(final String peerId, |
| ReplicationSourceManager manager, final boolean waitForSource) { |
| ReplicationPeers rp = manager.getReplicationPeers(); |
| Waiter.waitFor(conf, 20000, () -> { |
| if (waitForSource) { |
| ReplicationSourceInterface rs = manager.getSource(peerId); |
| if (rs == null) { |
| return false; |
| } |
| if (rs instanceof ReplicationSourceDummy) { |
| return ((ReplicationSourceDummy)rs).isStartup(); |
| } |
| return true; |
| } else { |
| return (rp.getPeer(peerId) != null); |
| } |
| }); |
| } |
| |
| /** |
| * Remove a peer and wait for it to get cleaned up |
| */ |
| private void removePeerAndWait(final String peerId) throws Exception { |
| final ReplicationPeers rp = manager.getReplicationPeers(); |
| if (rp.getPeerStorage().listPeerIds().contains(peerId)) { |
| rp.getPeerStorage().removePeer(peerId); |
| try { |
| manager.removePeer(peerId); |
| } catch (Exception e) { |
| // ignore the failed exception and continue. |
| } |
| } |
| Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() { |
| @Override |
| public boolean evaluate() throws Exception { |
| Collection<String> peers = rp.getPeerStorage().listPeerIds(); |
| return (!manager.getAllQueues().contains(peerId)) && (rp.getPeer(peerId) == null) |
| && (!peers.contains(peerId)) && manager.getSource(peerId) == null; |
| } |
| }); |
| } |
| |
| private WALEdit getBulkLoadWALEdit(NavigableMap<byte[], Integer> scope) { |
| // 1. Create store files for the families |
| Map<byte[], List<Path>> storeFiles = new HashMap<>(1); |
| Map<String, Long> storeFilesSize = new HashMap<>(1); |
| List<Path> p = new ArrayList<>(1); |
| Path hfilePath1 = new Path(Bytes.toString(f1)); |
| p.add(hfilePath1); |
| try { |
| storeFilesSize.put(hfilePath1.getName(), fs.getFileStatus(hfilePath1).getLen()); |
| } catch (IOException e) { |
| LOG.debug("Failed to calculate the size of hfile " + hfilePath1); |
| storeFilesSize.put(hfilePath1.getName(), 0L); |
| } |
| storeFiles.put(f1, p); |
| scope.put(f1, 1); |
| p = new ArrayList<>(1); |
| Path hfilePath2 = new Path(Bytes.toString(f2)); |
| p.add(hfilePath2); |
| try { |
| storeFilesSize.put(hfilePath2.getName(), fs.getFileStatus(hfilePath2).getLen()); |
| } catch (IOException e) { |
| LOG.debug("Failed to calculate the size of hfile " + hfilePath2); |
| storeFilesSize.put(hfilePath2.getName(), 0L); |
| } |
| storeFiles.put(f2, p); |
| // 2. Create bulk load descriptor |
| BulkLoadDescriptor desc = |
| ProtobufUtil.toBulkLoadDescriptor(hri.getTable(), |
| UnsafeByteOperations.unsafeWrap(hri.getEncodedNameAsBytes()), storeFiles, storeFilesSize, 1); |
| |
| // 3. create bulk load wal edit event |
| WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc); |
| return logEdit; |
| } |
| |
| static class DummyNodeFailoverWorker extends Thread { |
| private Map<String, Set<String>> logZnodesMap; |
| Server server; |
| private ServerName deadRS; |
| ReplicationQueueStorage rq; |
| |
| public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception { |
| this.deadRS = deadRS; |
| this.server = s; |
| this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), |
| server.getConfiguration()); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| logZnodesMap = new HashMap<>(); |
| List<String> queues = rq.getAllQueues(deadRS); |
| for (String queue : queues) { |
| Pair<String, SortedSet<String>> pair = |
| rq.claimQueue(deadRS, queue, server.getServerName()); |
| if (pair != null) { |
| logZnodesMap.put(pair.getFirst(), pair.getSecond()); |
| } |
| } |
| server.abort("Done with testing", null); |
| } catch (Exception e) { |
| LOG.error("Got exception while running NodeFailoverWorker", e); |
| } finally { |
| latch.countDown(); |
| } |
| } |
| |
| /** |
| * @return 1 when the map is not empty. |
| */ |
| private int isLogZnodesMapPopulated() { |
| Collection<Set<String>> sets = logZnodesMap.values(); |
| if (sets.size() > 1) { |
| throw new RuntimeException("unexpected size of logZnodesMap: " + sets.size()); |
| } |
| if (sets.size() == 1) { |
| Set<String> s = sets.iterator().next(); |
| for (String file : files) { |
| // at least one file was missing |
| if (!s.contains(file)) { |
| return 0; |
| } |
| } |
| return 1; // we found all the files |
| } |
| return 0; |
| } |
| } |
| |
| static class FailInitializeDummyReplicationSource extends ReplicationSourceDummy { |
| |
| @Override |
| public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager, |
| ReplicationQueueStorage rq, ReplicationPeer rp, Server server, String peerClusterId, |
| UUID clusterId, WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) |
| throws IOException { |
| throw new IOException("Failing deliberately"); |
| } |
| } |
| |
| static class DummyServer extends MockServer { |
| String hostname; |
| |
| DummyServer() { |
| hostname = "hostname.example.org"; |
| } |
| |
| DummyServer(String hostname) { |
| this.hostname = hostname; |
| } |
| |
| @Override |
| public Configuration getConfiguration() { |
| return conf; |
| } |
| |
| @Override |
| public ZKWatcher getZooKeeper() { |
| return zkw; |
| } |
| |
| @Override |
| public ServerName getServerName() { |
| return ServerName.valueOf(hostname, 1234, 1L); |
| } |
| } |
| } |