| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.replication; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.fail; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import org.apache.commons.io.IOUtils; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HBaseTestingUtility; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.Table; |
| import org.apache.hadoop.hbase.client.TableDescriptor; |
| import org.apache.hadoop.hbase.client.TableDescriptorBuilder; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil; |
| import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; |
| import org.junit.After; |
| import org.junit.AfterClass; |
| import org.junit.Before; |
| import org.junit.BeforeClass; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; |
| import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; |
| |
| /** |
| * This class is only a base for other integration-level replication tests. |
| * Do not add tests here. |
| * TestReplicationSmallTests is where tests that don't require bring machines up/down should go |
| * All other tests should have their own classes and extend this one |
| */ |
| public class TestReplicationBase { |
| private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class); |
| private static Connection connection1; |
| private static Connection connection2; |
| protected static Configuration CONF_WITH_LOCALFS; |
| |
| protected static Admin hbaseAdmin; |
| |
| protected static Table htable1; |
| protected static Table htable2; |
| |
| protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility(); |
| protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility(); |
| protected static Configuration CONF1 = UTIL1.getConfiguration(); |
| protected static Configuration CONF2 = UTIL2.getConfiguration(); |
| |
| protected static int NUM_SLAVES1 = 1; |
| protected static final int NUM_SLAVES2 = 1; |
| protected static final int NB_ROWS_IN_BATCH = 100; |
| protected static final int NB_ROWS_IN_BIG_BATCH = |
| NB_ROWS_IN_BATCH * 10; |
| protected static final long SLEEP_TIME = 500; |
| protected static final int NB_RETRIES = 50; |
| |
| protected static final TableName tableName = TableName.valueOf("test"); |
| protected static final byte[] famName = Bytes.toBytes("f"); |
| protected static final byte[] row = Bytes.toBytes("row"); |
| protected static final byte[] noRepfamName = Bytes.toBytes("norep"); |
| protected static final String PEER_ID2 = "2"; |
| |
| protected boolean isSerialPeer() { |
| return false; |
| } |
| |
| protected boolean isSyncPeer() { |
| return false; |
| } |
| |
| protected final void cleanUp() throws IOException, InterruptedException { |
| // Starting and stopping replication can make us miss new logs, |
| // rolling like this makes sure the most recent one gets added to the queue |
| for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster() |
| .getRegionServerThreads()) { |
| UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName()); |
| } |
| int rowCount = UTIL1.countRows(tableName); |
| UTIL1.deleteTableData(tableName); |
| // truncating the table will send one Delete per row to the slave cluster |
| // in an async fashion, which is why we cannot just call deleteTableData on |
| // utility2 since late writes could make it to the slave in some way. |
| // Instead, we truncate the first table and wait for all the Deletes to |
| // make it to the slave. |
| Scan scan = new Scan(); |
| int lastCount = 0; |
| for (int i = 0; i < NB_RETRIES; i++) { |
| if (i == NB_RETRIES - 1) { |
| fail("Waited too much time for truncate"); |
| } |
| ResultScanner scanner = htable2.getScanner(scan); |
| Result[] res = scanner.next(rowCount); |
| scanner.close(); |
| if (res.length != 0) { |
| if (res.length < lastCount) { |
| i--; // Don't increment timeout if we make progress |
| } |
| lastCount = res.length; |
| LOG.info("Still got " + res.length + " rows"); |
| Thread.sleep(SLEEP_TIME); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| protected static void waitForReplication(int expectedRows, int retries) |
| throws IOException, InterruptedException { |
| waitForReplication(htable2, expectedRows, retries); |
| } |
| |
| protected static void waitForReplication(Table htable2, int expectedRows, int retries) |
| throws IOException, InterruptedException { |
| Scan scan; |
| for (int i = 0; i < retries; i++) { |
| scan = new Scan(); |
| if (i== retries -1) { |
| fail("Waited too much time for normal batch replication"); |
| } |
| ResultScanner scanner = htable2.getScanner(scan); |
| Result[] res = scanner.next(expectedRows); |
| scanner.close(); |
| if (res.length != expectedRows) { |
| LOG.info("Only got " + res.length + " rows"); |
| Thread.sleep(SLEEP_TIME); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| protected static void loadData(String prefix, byte[] row) throws IOException { |
| loadData(prefix, row, famName); |
| } |
| |
| protected static void loadData(String prefix, byte[] row, byte[] familyName) throws IOException { |
| List<Put> puts = new ArrayList<>(NB_ROWS_IN_BATCH); |
| for (int i = 0; i < NB_ROWS_IN_BATCH; i++) { |
| Put put = new Put(Bytes.toBytes(prefix + Integer.toString(i))); |
| put.addColumn(familyName, row, row); |
| puts.add(put); |
| } |
| htable1.put(puts); |
| } |
| |
| protected static void setupConfig(HBaseTestingUtility util, String znodeParent) { |
| Configuration conf = util.getConfiguration(); |
| conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent); |
| // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger |
| // sufficient number of events. But we don't want to go too low because |
| // HBaseInterClusterReplicationEndpoint partitions entries into batches and we want |
| // more than one batch sent to the peer cluster for better testing. |
| conf.setInt("replication.source.size.capacity", 102400); |
| conf.setLong("replication.source.sleepforretries", 100); |
| conf.setInt("hbase.regionserver.maxlogs", 10); |
| conf.setLong("hbase.master.logcleaner.ttl", 10); |
| conf.setInt("zookeeper.recovery.retry", 1); |
| conf.setInt("zookeeper.recovery.retry.intervalmill", 10); |
| conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100); |
| conf.setInt("replication.stats.thread.period.seconds", 5); |
| conf.setBoolean("hbase.tests.use.shortcircuit.reads", false); |
| conf.setLong("replication.sleep.before.failover", 2000); |
| conf.setInt("replication.source.maxretriesmultiplier", 10); |
| conf.setFloat("replication.source.ratio", 1.0f); |
| conf.setBoolean("replication.source.eof.autorecovery", true); |
| conf.setLong("hbase.serial.replication.waiting.ms", 100); |
| } |
| |
| static void configureClusters(HBaseTestingUtility util1, |
| HBaseTestingUtility util2) { |
| setupConfig(util1, "/1"); |
| setupConfig(util2, "/2"); |
| |
| Configuration conf2 = util2.getConfiguration(); |
| conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); |
| conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6); |
| conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false); |
| } |
| |
| static void restartSourceCluster(int numSlaves) |
| throws Exception { |
| IOUtils.closeQuietly(hbaseAdmin, htable1); |
| UTIL1.shutdownMiniHBaseCluster(); |
| UTIL1.restartHBaseCluster(numSlaves); |
| // Invalidate the cached connection state. |
| CONF1 = UTIL1.getConfiguration(); |
| hbaseAdmin = UTIL1.getAdmin(); |
| Connection connection1 = UTIL1.getConnection(); |
| htable1 = connection1.getTable(tableName); |
| } |
| |
| static void restartTargetHBaseCluster(int numSlaves) throws Exception { |
| IOUtils.closeQuietly(htable2); |
| UTIL2.restartHBaseCluster(numSlaves); |
| // Invalidate the cached connection state |
| CONF2 = UTIL2.getConfiguration(); |
| htable2 = UTIL2.getConnection().getTable(tableName); |
| } |
| |
| private static void startClusters() throws Exception { |
| UTIL1.startMiniZKCluster(); |
| MiniZooKeeperCluster miniZK = UTIL1.getZkCluster(); |
| LOG.info("Setup first Zk"); |
| |
| UTIL2.setZkCluster(miniZK); |
| LOG.info("Setup second Zk"); |
| |
| CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1); |
| UTIL1.startMiniCluster(NUM_SLAVES1); |
| // Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks |
| // as a component in deciding maximum number of parallel batches to send to the peer cluster. |
| UTIL2.startMiniCluster(NUM_SLAVES2); |
| |
| connection1 = ConnectionFactory.createConnection(CONF1); |
| connection2 = ConnectionFactory.createConnection(CONF2); |
| hbaseAdmin = connection1.getAdmin(); |
| |
| TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100) |
| .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()) |
| .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build(); |
| |
| try ( |
| Admin admin1 = connection1.getAdmin(); |
| Admin admin2 = connection2.getAdmin()) { |
| admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); |
| admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE); |
| UTIL1.waitUntilAllRegionsAssigned(tableName); |
| htable1 = connection1.getTable(tableName); |
| UTIL2.waitUntilAllRegionsAssigned(tableName); |
| htable2 = connection2.getTable(tableName); |
| } |
| |
| } |
| |
| @BeforeClass |
| public static void setUpBeforeClass() throws Exception { |
| configureClusters(UTIL1, UTIL2); |
| startClusters(); |
| } |
| |
| private boolean peerExist(String peerId) throws IOException { |
| return hbaseAdmin.listReplicationPeers().stream().anyMatch(p -> peerId.equals(p.getPeerId())); |
| } |
| |
| @Before |
| public void setUpBase() throws Exception { |
| if (!peerExist(PEER_ID2)) { |
| ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder() |
| .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()); |
| if (isSyncPeer()) { |
| FileSystem fs2 = UTIL2.getTestFileSystem(); |
| // The remote wal dir is not important as we do not use it in DA state, here we only need to |
| // confirm that a sync peer in DA state can still replicate data to remote cluster |
| // asynchronously. |
| builder.setReplicateAllUserTables(false) |
| .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())) |
| .setRemoteWALDir(new Path("/RemoteWAL") |
| .makeQualified(fs2.getUri(), fs2.getWorkingDirectory()).toUri().toString()); |
| } |
| hbaseAdmin.addReplicationPeer(PEER_ID2, builder.build()); |
| } |
| } |
| |
| @After |
| public void tearDownBase() throws Exception { |
| if (peerExist(PEER_ID2)) { |
| hbaseAdmin.removeReplicationPeer(PEER_ID2); |
| } |
| } |
| |
| protected static void runSimplePutDeleteTest() throws IOException, InterruptedException { |
| Put put = new Put(row); |
| put.addColumn(famName, row, row); |
| |
| htable1 = UTIL1.getConnection().getTable(tableName); |
| htable1.put(put); |
| |
| Get get = new Get(row); |
| for (int i = 0; i < NB_RETRIES; i++) { |
| if (i == NB_RETRIES - 1) { |
| fail("Waited too much time for put replication"); |
| } |
| Result res = htable2.get(get); |
| if (res.isEmpty()) { |
| LOG.info("Row not available"); |
| Thread.sleep(SLEEP_TIME); |
| } else { |
| assertArrayEquals(row, res.value()); |
| break; |
| } |
| } |
| |
| Delete del = new Delete(row); |
| htable1.delete(del); |
| |
| get = new Get(row); |
| for (int i = 0; i < NB_RETRIES; i++) { |
| if (i == NB_RETRIES - 1) { |
| fail("Waited too much time for del replication"); |
| } |
| Result res = htable2.get(get); |
| if (res.size() >= 1) { |
| LOG.info("Row not deleted"); |
| Thread.sleep(SLEEP_TIME); |
| } else { |
| break; |
| } |
| } |
| } |
| |
| protected static void runSmallBatchTest() throws IOException, InterruptedException { |
| // normal Batch tests |
| loadData("", row); |
| |
| Scan scan = new Scan(); |
| |
| ResultScanner scanner1 = htable1.getScanner(scan); |
| Result[] res1 = scanner1.next(NB_ROWS_IN_BATCH); |
| scanner1.close(); |
| assertEquals(NB_ROWS_IN_BATCH, res1.length); |
| |
| waitForReplication(NB_ROWS_IN_BATCH, NB_RETRIES); |
| } |
| |
| @AfterClass |
| public static void tearDownAfterClass() throws Exception { |
| if (htable2 != null) { |
| htable2.close(); |
| } |
| if (htable1 != null) { |
| htable1.close(); |
| } |
| if (hbaseAdmin != null) { |
| hbaseAdmin.close(); |
| } |
| |
| if (connection2 != null) { |
| connection2.close(); |
| } |
| if (connection1 != null) { |
| connection1.close(); |
| } |
| UTIL2.shutdownMiniCluster(); |
| UTIL1.shutdownMiniCluster(); |
| } |
| } |