blob: 99180ec8bad16e84bb068578cae66b66525cec4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.coprocessor.CoreCoprocessor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({LargeTests.class, ClientTests.class})
public class TestReplicaWithCluster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicaWithCluster.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicaWithCluster.class);
private static final int NB_SERVERS = 3;
private static final byte[] row = Bytes.toBytes(TestReplicaWithCluster.class.getName());
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
// second minicluster used in testing of replication
private static HBaseTestingUtility HTU2;
private static final byte[] f = HConstants.CATALOG_FAMILY;
private final static int REFRESH_PERIOD = 1000;
private final static int META_SCAN_TIMEOUT_IN_MILLISEC = 200;
/**
* This copro is used to synchronize the tests.
*/
public static class SlowMeCopro implements RegionCoprocessor, RegionObserver {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicReference<CountDownLatch> cdl = new AtomicReference<>(new CountDownLatch(0));
public SlowMeCopro() {
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
CountDownLatch latch = cdl.get();
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
Thread.sleep(sleepTime.get());
} else if (latch.getCount() > 0) {
LOG.info("Waiting for the counterCountDownLatch");
latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
if (latch.getCount() > 0) {
throw new RuntimeException("Can't wait more");
}
}
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
} else {
LOG.info("We're not the primary replicas.");
}
}
}
/**
* This copro is used to simulate region server down exception for Get and Scan
*/
@CoreCoprocessor
public static class RegionServerStoppedCopro implements RegionCoprocessor, RegionObserver {
public RegionServerStoppedCopro() {
}
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica and replica 1
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
} else {
LOG.info("We're replica region " + replicaId);
}
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica and replica 1
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() <= 1) {
LOG.info("Throw Region Server Stopped Exceptoin for replica id " + replicaId);
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
} else {
LOG.info("We're replica region " + replicaId);
}
}
}
/**
* This copro is used to slow down the primary meta region scan a bit
*/
public static class RegionServerHostingPrimayMetaRegionSlowOrStopCopro
implements RegionCoprocessor, RegionObserver {
static boolean slowDownPrimaryMetaScan = false;
static boolean throwException = false;
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
final Get get, final List<Cell> results) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Fail for the primary replica, but not for meta
if (throwException) {
if (!e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
LOG.info("Get, throw Region Server Stopped Exceptoin for region " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
}
} else {
LOG.info("Get, We're replica region " + replicaId);
}
}
@Override
public void preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan) throws IOException {
int replicaId = e.getEnvironment().getRegion().getRegionInfo().getReplicaId();
// Slow down with the primary meta region scan
if (e.getEnvironment().getRegion().getRegionInfo().isMetaRegion() && (replicaId == 0)) {
if (slowDownPrimaryMetaScan) {
LOG.info("Scan with primary meta region, slow down a bit");
try {
Thread.sleep(META_SCAN_TIMEOUT_IN_MILLISEC - 50);
} catch (InterruptedException ie) {
// Ingore
}
}
// Fail for the primary replica
if (throwException) {
LOG.info("Scan, throw Region Server Stopped Exceptoin for replica " + e.getEnvironment()
.getRegion().getRegionInfo());
throw new RegionServerStoppedException("Server " + e.getEnvironment().getServerName()
+ " not running");
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
} else {
LOG.info("Scan, We're replica region " + replicaId);
}
}
}
@BeforeClass
public static void beforeClass() throws Exception {
// enable store file refreshing
HTU.getConfiguration().setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD,
REFRESH_PERIOD);
HTU.getConfiguration().setFloat("hbase.regionserver.logroll.multiplier", 0.0001f);
HTU.getConfiguration().setInt("replication.source.size.capacity", 10240);
HTU.getConfiguration().setLong("replication.source.sleepforretries", 100);
HTU.getConfiguration().setInt("hbase.regionserver.maxlogs", 2);
HTU.getConfiguration().setLong("hbase.master.logcleaner.ttl", 10);
HTU.getConfiguration().setInt("zookeeper.recovery.retry", 1);
HTU.getConfiguration().setInt("zookeeper.recovery.retry.intervalmill", 10);
// Wait for primary call longer so make sure that it will get exception from the primary call
HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.get", 1000000);
HTU.getConfiguration().setInt("hbase.client.primaryCallTimeout.scan", 1000000);
// Make sure master does not host system tables.
HTU.getConfiguration().set("hbase.balancer.tablesOnMaster", "none");
// Set system coprocessor so it can be applied to meta regions
HTU.getConfiguration().set("hbase.coprocessor.region.classes",
RegionServerHostingPrimayMetaRegionSlowOrStopCopro.class.getName());
HTU.getConfiguration().setInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
META_SCAN_TIMEOUT_IN_MILLISEC * 1000);
HTU.startMiniCluster(NB_SERVERS);
// Enable meta replica at server side
HBaseTestingUtility.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, 2);
HTU.getHBaseCluster().startMaster();
}
@AfterClass
public static void afterClass() throws Exception {
if (HTU2 != null)
HTU2.shutdownMiniCluster();
HTU.shutdownMiniCluster();
}
@Test
public void testCreateDeleteTable() throws IOException {
// Create table then get the single region for our new table.
TableDescriptorBuilder builder =
HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
builder.setRegionReplication(NB_SERVERS);
builder.setCoprocessor(SlowMeCopro.class.getName());
TableDescriptor hdt = builder.build();
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
Get g = new Get(row);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
try {
// But if we ask for stale we will get it
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
SlowMeCopro.cdl.get().countDown();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
@Test
public void testChangeTable() throws Exception {
TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf("testChangeTable"))
.setRegionReplication(NB_SERVERS)
.setCoprocessor(SlowMeCopro.class.getName())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(f))
.build();
HTU.getAdmin().createTable(td);
Table table = HTU.getConnection().getTable(td.getTableName());
// basic test: it should work.
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
Get g = new Get(row);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
// Add a CF, it should work.
TableDescriptor bHdt = HTU.getAdmin().getDescriptor(td.getTableName());
td = TableDescriptorBuilder.newBuilder(td)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(row))
.build();
HTU.getAdmin().disableTable(td.getTableName());
HTU.getAdmin().modifyTable(td);
HTU.getAdmin().enableTable(td.getTableName());
TableDescriptor nHdt = HTU.getAdmin().getDescriptor(td.getTableName());
Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
p = new Put(row);
p.addColumn(row, row, row);
table.put(p);
g = new Get(row);
r = table.get(g);
Assert.assertFalse(r.isStale());
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
Admin admin = HTU.getAdmin();
nHdt =admin.getDescriptor(td.getTableName());
Assert.assertEquals("fams=" + Arrays.toString(nHdt.getColumnFamilies()),
bHdt.getColumnFamilyCount() + 1, nHdt.getColumnFamilyCount());
admin.disableTable(td.getTableName());
admin.deleteTable(td.getTableName());
admin.close();
}
@SuppressWarnings("deprecation")
@Test
public void testReplicaAndReplication() throws Exception {
TableDescriptorBuilder builder =
HTU.createModifyableTableDescriptor("testReplicaAndReplication");
builder.setRegionReplication(NB_SERVERS);
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(row)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
builder.setCoprocessor(SlowMeCopro.class.getName());
TableDescriptor tableDescriptor = builder.build();
HTU.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
Configuration conf2 = HBaseConfiguration.create(HTU.getConfiguration());
conf2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1));
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
MiniZooKeeperCluster miniZK = HTU.getZkCluster();
HTU2 = new HBaseTestingUtility(conf2);
HTU2.setZkCluster(miniZK);
HTU2.startMiniCluster(NB_SERVERS);
LOG.info("Setup second Zk");
HTU2.getAdmin().createTable(tableDescriptor, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Admin admin = connection.getAdmin()) {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
.setClusterKey(HTU2.getClusterKey()).build();
admin.addReplicationPeer("2", rpc);
}
Put p = new Put(row);
p.addColumn(row, row, row);
final Table table = HTU.getConnection().getTable(tableDescriptor.getTableName());
table.put(p);
HTU.getAdmin().flush(table.getName());
LOG.info("Put & flush done on the first cluster. Now doing a get on the same cluster.");
Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
return !r.isEmpty();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
}
});
table.close();
LOG.info("stale get on the first cluster done. Now for the second.");
final Table table2 = HTU.getConnection().getTable(tableDescriptor.getTableName());
Waiter.waitFor(HTU.getConfiguration(), 1000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table2.get(g);
Assert.assertTrue(r.isStale());
return !r.isEmpty();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
}
});
table2.close();
HTU.getAdmin().disableTable(tableDescriptor.getTableName());
HTU.deleteTable(tableDescriptor.getTableName());
HTU2.getAdmin().disableTable(tableDescriptor.getTableName());
HTU2.deleteTable(tableDescriptor.getTableName());
// We shutdown HTU2 minicluster later, in afterClass(), as shutting down
// the minicluster has negative impact of deleting all HConnections in JVM.
}
@Test
public void testBulkLoad() throws IOException {
// Create table then get the single region for our new table.
LOG.debug("Creating test table");
TableDescriptorBuilder builder = HTU.createModifyableTableDescriptor(
TableName.valueOf("testBulkLoad"), ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3,
HConstants.FOREVER, ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
builder.setRegionReplication(NB_SERVERS);
builder.setCoprocessor(SlowMeCopro.class.getName());
TableDescriptor hdt = builder.build();
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
// create hfiles to load.
LOG.debug("Creating test data");
Path dir = HTU.getDataTestDirOnTestFS("testBulkLoad");
final int numRows = 10;
final byte[] qual = Bytes.toBytes("qual");
final byte[] val = Bytes.toBytes("val");
Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (ColumnFamilyDescriptor col : hdt.getColumnFamilies()) {
Path hfile = new Path(dir, col.getNameAsString());
TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
val, numRows);
family2Files.put(col.getName(), Collections.singletonList(hfile));
}
// bulk load HFiles
LOG.debug("Loading test data");
BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
// verify we can read them from the primary
LOG.debug("Verifying data load");
for (int i = 0; i < numRows; i++) {
byte[] row = TestHRegionServerBulkLoad.rowkey(i);
Get g = new Get(row);
Result r = table.get(g);
Assert.assertFalse(r.isStale());
}
// verify we can read them from the replica
LOG.debug("Verifying replica queries");
try {
SlowMeCopro.cdl.set(new CountDownLatch(1));
for (int i = 0; i < numRows; i++) {
byte[] row = TestHRegionServerBulkLoad.rowkey(i);
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
}
SlowMeCopro.cdl.get().countDown();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
@Test
public void testReplicaGetWithPrimaryDown() throws IOException {
// Create table then get the single region for our new table.
TableDescriptorBuilder builder =
HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
builder.setRegionReplication(NB_SERVERS);
builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
TableDescriptor hdt = builder.build();
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
// But if we ask for stale we will get it
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
@Test
public void testReplicaScanWithPrimaryDown() throws IOException {
// Create table then get the single region for our new table.
TableDescriptorBuilder builder =
HTU.createModifyableTableDescriptor(TableName.valueOf("testCreateDeleteTable"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
builder.setRegionReplication(NB_SERVERS);
builder.setCoprocessor(RegionServerStoppedCopro.class.getName());
TableDescriptor hdt = builder.build();
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
// But if we ask for stale we will get it
// Instantiating the Scan class
Scan scan = new Scan();
// Scanning the required columns
scan.addFamily(f);
scan.setConsistency(Consistency.TIMELINE);
// Getting the scan result
ResultScanner scanner = table.getScanner(scan);
Result r = scanner.next();
Assert.assertTrue(r.isStale());
} finally {
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
@Test
public void testReplicaGetWithAsyncRpcClientImpl() throws IOException {
HTU.getConfiguration().setBoolean("hbase.ipc.client.specificThreadForWriting", true);
HTU.getConfiguration().set("hbase.rpc.client.impl",
"org.apache.hadoop.hbase.ipc.AsyncRpcClient");
// Create table then get the single region for our new table.
TableDescriptorBuilder builder =
HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicaGetWithAsyncRpcClientImpl"),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
builder.setRegionReplication(NB_SERVERS);
builder.setCoprocessor(SlowMeCopro.class.getName());
TableDescriptor hdt = builder.build();
try {
Table table = HTU.createTable(hdt, new byte[][] { f }, null);
Put p = new Put(row);
p.addColumn(f, row, row);
table.put(p);
// Flush so it can be picked by the replica refresher thread
HTU.flush(table.getName());
// Sleep for some time until data is picked up by replicas
try {
Thread.sleep(2 * REFRESH_PERIOD);
} catch (InterruptedException e1) {
LOG.error(e1.toString(), e1);
}
try {
// Create the new connection so new config can kick in
Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table t = connection.getTable(hdt.getTableName());
// But if we ask for stale we will get it
SlowMeCopro.cdl.set(new CountDownLatch(1));
Get g = new Get(row);
g.setConsistency(Consistency.TIMELINE);
Result r = t.get(g);
Assert.assertTrue(r.isStale());
SlowMeCopro.cdl.get().countDown();
} finally {
SlowMeCopro.cdl.get().countDown();
SlowMeCopro.sleepTime.set(0);
}
} finally {
HTU.getConfiguration().unset("hbase.ipc.client.specificThreadForWriting");
HTU.getConfiguration().unset("hbase.rpc.client.impl");
HTU.getAdmin().disableTable(hdt.getTableName());
HTU.deleteTable(hdt.getTableName());
}
}
}