blob: f77bafedf4205ab625fa0c06872fdb5ed6810615 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.protobuf.ServiceException;
@Category({ReplicationTests.class, LargeTests.class})
public class TestMasterReplication {
private static final Log LOG = LogFactory.getLog(TestReplicationBase.class);
private Configuration baseConfiguration;
private HBaseTestingUtility[] utilities;
private Configuration[] configurations;
private MiniZooKeeperCluster miniZK;
private static final long SLEEP_TIME = 1000;
private static final int NB_RETRIES = 120;
private static final TableName tableName = TableName.valueOf("test");
private static final byte[] famName = Bytes.toBytes("f");
private static final byte[] famName1 = Bytes.toBytes("f1");
private static final byte[] row = Bytes.toBytes("row");
private static final byte[] row1 = Bytes.toBytes("row1");
private static final byte[] row2 = Bytes.toBytes("row2");
private static final byte[] row3 = Bytes.toBytes("row3");
private static final byte[] row4 = Bytes.toBytes("row4");
private static final byte[] noRepfamName = Bytes.toBytes("norep");
private static final byte[] count = Bytes.toBytes("count");
private static final byte[] put = Bytes.toBytes("put");
private static final byte[] delete = Bytes.toBytes("delete");
private HTableDescriptor table;
@Before
public void setUp() throws Exception {
baseConfiguration = HBaseConfiguration.create();
// smaller block size and capacity to trigger more operations
// and test them
baseConfiguration.setInt("hbase.regionserver.hlog.blocksize", 1024 * 20);
baseConfiguration.setInt("replication.source.size.capacity", 1024);
baseConfiguration.setLong("replication.source.sleepforretries", 100);
baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
baseConfiguration.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
baseConfiguration.setBoolean("dfs.support.append", true);
baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
baseConfiguration.setStrings(
CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
CoprocessorCounter.class.getName());
table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(famName1);
fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
table.addFamily(fam);
fam = new HColumnDescriptor(noRepfamName);
table.addFamily(fam);
}
/**
* It tests the replication scenario involving 0 -> 1 -> 0. It does it by
* adding and deleting a row to a table in each cluster, checking if it's
* replicated. It also tests that the puts and deletes are not replicated back
* to the originating cluster.
*/
@Test(timeout = 300000)
public void testCyclicReplication1() throws Exception {
LOG.info("testSimplePutDelete");
int numClusters = 2;
Table[] htables = null;
try {
htables = setUpClusterTablesAndPeers(numClusters);
int[] expectedCounts = new int[] { 2, 2 };
// add rows to both clusters,
// make sure they are both replication
putAndWait(row, famName, htables[0], htables[1]);
putAndWait(row1, famName, htables[1], htables[0]);
validateCounts(htables, put, expectedCounts);
deleteAndWait(row, htables[0], htables[1]);
deleteAndWait(row1, htables[1], htables[0]);
validateCounts(htables, delete, expectedCounts);
} finally {
close(htables);
shutDownMiniClusters();
}
}
/**
* It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
* HFiles to a table in each cluster, checking if it's replicated.
*/
@Test(timeout = 300000)
public void testHFileCyclicReplication() throws Exception {
LOG.info("testHFileCyclicReplication");
int numClusters = 2;
Table[] htables = null;
try {
htables = setUpClusterTablesAndPeers(numClusters);
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
// to cluster '1'.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
int numOfRows = 100;
int[] expectedCounts =
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
famName, htables, hfileRanges, numOfRows, expectedCounts, true);
// Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
// to cluster '0'.
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
numOfRows = 200;
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
hfileRanges.length * numOfRows + expectedCounts[1] };
loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
} finally {
close(htables);
shutDownMiniClusters();
}
}
private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
Table[] htables;
startMiniClusters(numClusters);
createTableOnClusters(table);
htables = getHTablesOnClusters(tableName);
// Test the replication scenarios of 0 -> 1 -> 0
addPeer("1", 0, 1);
addPeer("1", 1, 0);
return htables;
}
/**
* Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
* table in each clusters and ensuring that the each of these clusters get the appropriate
* mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
* originating from itself and also the edits that it received using replication from a different
* cluster. The scenario is explained in HBASE-9158
*/
@Test(timeout = 300000)
public void testCyclicReplication2() throws Exception {
LOG.info("testCyclicReplication1");
int numClusters = 3;
Table[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
// Test the replication scenario of 0 -> 1 -> 2 -> 0
addPeer("1", 0, 1);
addPeer("1", 1, 2);
addPeer("1", 2, 0);
htables = getHTablesOnClusters(tableName);
// put "row" and wait 'til it got around
putAndWait(row, famName, htables[0], htables[2]);
putAndWait(row1, famName, htables[1], htables[0]);
putAndWait(row2, famName, htables[2], htables[1]);
deleteAndWait(row, htables[0], htables[2]);
deleteAndWait(row1, htables[1], htables[0]);
deleteAndWait(row2, htables[2], htables[1]);
int[] expectedCounts = new int[] { 3, 3, 3 };
validateCounts(htables, put, expectedCounts);
validateCounts(htables, delete, expectedCounts);
// Test HBASE-9158
disablePeer("1", 2);
// we now have an edit that was replicated into cluster originating from
// cluster 0
putAndWait(row3, famName, htables[0], htables[1]);
// now add a local edit to cluster 1
htables[1].put(new Put(row4).addColumn(famName, row4, row4));
// re-enable replication from cluster 2 to cluster 0
enablePeer("1", 2);
// without HBASE-9158 the edit for row4 would have been marked with
// cluster 0's id
// and hence not replicated to cluster 0
wait(row4, htables[0], true);
} finally {
close(htables);
shutDownMiniClusters();
}
}
/**
* It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
* loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
*/
@Test(timeout = 300000)
public void testHFileMultiSlaveReplication() throws Exception {
LOG.info("testHFileMultiSlaveReplication");
int numClusters = 3;
Table[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
// Add a slave, 0 -> 1
addPeer("1", 0, 1);
htables = getHTablesOnClusters(tableName);
// Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
// to cluster '1'.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
int numOfRows = 100;
int[] expectedCounts =
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
famName, htables, hfileRanges, numOfRows, expectedCounts, true);
// Validate data is not replicated to cluster '2'.
assertEquals(0, utilities[2].countRows(htables[2]));
rollWALAndWait(utilities[0], htables[0].getName(), row);
// Add one more slave, 0 -> 2
addPeer("2", 0, 2);
// Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
// to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
numOfRows = 200;
int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
} finally {
close(htables);
shutDownMiniClusters();
}
}
/**
* It tests the bulk loaded hfile replication scenario to only explicitly specified table column
* families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
* only one CF data to replicate.
*/
@Test(timeout = 300000)
public void testHFileReplicationForConfiguredTableCfs() throws Exception {
LOG.info("testHFileReplicationForConfiguredTableCfs");
int numClusters = 2;
Table[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
htables = getHTablesOnClusters(tableName);
// Test the replication scenarios only 'f' is configured for table data replication not 'f1'
addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
// Load 100 rows for each hfile range in cluster '0' for table CF 'f'
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
int numOfRows = 100;
int[] expectedCounts =
new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
hfileRanges, numOfRows, expectedCounts, true);
// Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
numOfRows = 100;
int[] newExpectedCounts =
new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
hfileRanges, numOfRows, newExpectedCounts, false);
// Validate data replication for CF 'f1'
// Source cluster table should contain data for the families
wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
// Sleep for enough time so that the data is still not replicated for the CF which is not
// configured for replication
Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
// Peer cluster should have only configured CF data
wait(1, htables[1], expectedCounts[1]);
} finally {
close(htables);
shutDownMiniClusters();
}
}
/**
* Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
*/
@Test(timeout = 300000)
public void testCyclicReplication3() throws Exception {
LOG.info("testCyclicReplication2");
int numClusters = 3;
Table[] htables = null;
try {
startMiniClusters(numClusters);
createTableOnClusters(table);
// Test the replication scenario of 0 -> 1 -> 2 -> 1
addPeer("1", 0, 1);
addPeer("1", 1, 2);
addPeer("1", 2, 1);
htables = getHTablesOnClusters(tableName);
// put "row" and wait 'til it got around
putAndWait(row, famName, htables[0], htables[2]);
putAndWait(row1, famName, htables[1], htables[2]);
putAndWait(row2, famName, htables[2], htables[1]);
deleteAndWait(row, htables[0], htables[2]);
deleteAndWait(row1, htables[1], htables[2]);
deleteAndWait(row2, htables[2], htables[1]);
int[] expectedCounts = new int[] { 1, 3, 3 };
validateCounts(htables, put, expectedCounts);
validateCounts(htables, delete, expectedCounts);
} finally {
close(htables);
shutDownMiniClusters();
}
}
@After
public void tearDown() throws IOException {
configurations = null;
utilities = null;
}
@SuppressWarnings("resource")
private void startMiniClusters(int numClusters) throws Exception {
Random random = new Random();
utilities = new HBaseTestingUtility[numClusters];
configurations = new Configuration[numClusters];
for (int i = 0; i < numClusters; i++) {
Configuration conf = new Configuration(baseConfiguration);
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + i + random.nextInt());
HBaseTestingUtility utility = new HBaseTestingUtility(conf);
if (i == 0) {
utility.startMiniZKCluster();
miniZK = utility.getZkCluster();
} else {
utility.setZkCluster(miniZK);
}
utility.startMiniCluster();
utilities[i] = utility;
configurations[i] = conf;
new ZooKeeperWatcher(conf, "cluster" + i, null, true);
}
}
private void shutDownMiniClusters() throws Exception {
int numClusters = utilities.length;
for (int i = numClusters - 1; i >= 0; i--) {
if (utilities[i] != null) {
utilities[i].shutdownMiniCluster();
}
}
miniZK.shutdown();
}
private void createTableOnClusters(HTableDescriptor table) throws Exception {
for (HBaseTestingUtility utility : utilities) {
utility.getHBaseAdmin().createTable(table);
}
}
private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
replicationAdmin.addPeer(id, rpc, null);
} finally {
close(replicationAdmin);
}
}
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
ReplicationPeerConfig replicationPeerConfig = new ReplicationPeerConfig();
replicationPeerConfig.setClusterKey(utilities[slaveClusterNumber].getClusterKey());
replicationAdmin.addPeer(id, replicationPeerConfig,
ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
} finally {
close(replicationAdmin);
}
}
private void disablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.disablePeer(id);
} finally {
close(replicationAdmin);
}
}
private void enablePeer(String id, int masterClusterNumber) throws Exception {
ReplicationAdmin replicationAdmin = null;
try {
replicationAdmin = new ReplicationAdmin(
configurations[masterClusterNumber]);
replicationAdmin.enablePeer(id);
} finally {
close(replicationAdmin);
}
}
private void close(Closeable... closeables) {
try {
if (closeables != null) {
for (Closeable closeable : closeables) {
closeable.close();
}
}
} catch (Exception e) {
LOG.warn("Exception occured while closing the object:", e);
}
}
@SuppressWarnings("resource")
private Table[] getHTablesOnClusters(TableName tableName) throws Exception {
int numClusters = utilities.length;
Table[] htables = new Table[numClusters];
for (int i = 0; i < numClusters; i++) {
Table htable = ConnectionFactory.createConnection(configurations[i]).getTable(tableName);
htable.setWriteBufferSize(1024);
htables[i] = htable;
}
return htables;
}
private void validateCounts(Table[] htables, byte[] type,
int[] expectedCounts) throws IOException {
for (int i = 0; i < htables.length; i++) {
assertEquals(Bytes.toString(type) + " were replicated back ",
expectedCounts[i], getCount(htables[i], type));
}
}
private int getCount(Table t, byte[] type) throws IOException {
Get test = new Get(row);
test.setAttribute("count", new byte[] {});
Result res = t.get(test);
return Bytes.toInt(res.getValue(count, type));
}
private void deleteAndWait(byte[] row, Table source, Table target)
throws Exception {
Delete del = new Delete(row);
source.delete(del);
wait(row, target, true);
}
private void putAndWait(byte[] row, byte[] fam, Table source, Table target)
throws Exception {
Put put = new Put(row);
put.addColumn(fam, row, row);
source.put(put);
wait(row, target, false);
}
private void loadAndValidateHFileReplication(String testName, int masterNumber,
int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
HBaseTestingUtility util = utilities[masterNumber];
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(util.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}
Table source = tables[masterNumber];
final TableName tableName = source.getName();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
String[] args = { dir.toString(), tableName.toString() };
loader.run(args);
if (toValidate) {
for (int slaveClusterNumber : slaveNumbers) {
wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
}
}
}
private void wait(int slaveNumber, Table target, int expectedCount)
throws IOException, InterruptedException {
int count = 0;
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for bulkloaded data replication. Current count=" + count
+ ", expected count=" + expectedCount);
}
count = utilities[slaveNumber].countRows(target);
if (count != expectedCount) {
LOG.info("Waiting more time for bulkloaded data replication.");
Thread.sleep(SLEEP_TIME);
} else {
break;
}
}
}
private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i == NB_RETRIES - 1) {
fail("Waited too much time for replication. Row:" + Bytes.toString(row)
+ ". IsDeleteReplication:" + isDeleted);
}
Result res = target.get(get);
boolean sleep = isDeleted ? res.size() > 0 : res.size() == 0;
if (sleep) {
LOG.info("Waiting for more time for replication. Row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
Thread.sleep(SLEEP_TIME);
} else {
if (!isDeleted) {
assertArrayEquals(res.value(), row);
}
LOG.info("Obtained row:"
+ Bytes.toString(row) + ". IsDeleteReplication:" + isDeleted);
break;
}
}
}
private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
final byte[] row) throws IOException {
final Admin admin = utility.getHBaseAdmin();
final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
// find the region that corresponds to the given row.
HRegion region = null;
for (HRegion candidate : cluster.getRegions(table)) {
if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
region = candidate;
break;
}
}
assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
final CountDownLatch latch = new CountDownLatch(1);
// listen for successful log rolls
final WALActionsListener listener = new WALActionsListener.Base() {
@Override
public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
latch.countDown();
}
};
region.getWAL().registerWALActionsListener(listener);
// request a roll
admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
region.getRegionInfo().getRegionName()));
// wait
try {
latch.await();
} catch (InterruptedException exception) {
LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
"replication tests fail, it's probably because we should still be waiting.");
Thread.currentThread().interrupt();
}
region.getWAL().unregisterWALActionsListener(listener);
}
/**
* Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
* timestamp there is otherwise no way to count them.
*/
public static class CoprocessorCounter extends BaseRegionObserver {
private int nCount = 0;
private int nDelete = 0;
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, final Put put,
final WALEdit edit, final Durability durability) throws IOException {
nCount++;
}
@Override
public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Delete delete, final WALEdit edit, final Durability durability) throws IOException {
nDelete++;
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get, final List<Cell> result) throws IOException {
if (get.getAttribute("count") != null) {
result.clear();
// order is important!
result.add(new KeyValue(count, count, delete, Bytes.toBytes(nDelete)));
result.add(new KeyValue(count, count, put, Bytes.toBytes(nCount)));
c.bypass();
}
}
}
}