blob: 23753e21105402b1bc3eba2b6d7d8e36e1c8dd11 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.ipc.RemoteException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
/**
* Base class for testing sync replication.
*/
public class SyncReplicationTestBase {
protected static final HBaseZKTestingUtility ZK_UTIL = new HBaseZKTestingUtility();
protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
protected static TableName TABLE_NAME = TableName.valueOf("SyncRep");
protected static byte[] CF = Bytes.toBytes("cf");
protected static byte[] CQ = Bytes.toBytes("cq");
protected static String PEER_ID = "1";
protected static Path REMOTE_WAL_DIR1;
protected static Path REMOTE_WAL_DIR2;
protected static void initTestingUtility(HBaseTestingUtility util, String zkParent) {
util.setZkCluster(ZK_UTIL.getZkCluster());
Configuration conf = util.getConfiguration();
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, zkParent);
conf.setInt("replication.source.size.capacity", 102400);
conf.setLong("replication.source.sleepforretries", 100);
conf.setInt("hbase.regionserver.maxlogs", 10);
conf.setLong("hbase.master.logcleaner.ttl", 10);
conf.setInt("zookeeper.recovery.retry", 1);
conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
conf.setInt("replication.stats.thread.period.seconds", 5);
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
conf.setFloat("replication.source.ratio", 1.0f);
conf.setBoolean("replication.source.eof.autorecovery", true);
}
@BeforeClass
public static void setUp() throws Exception {
ZK_UTIL.startMiniZKCluster();
initTestingUtility(UTIL1, "/cluster1");
initTestingUtility(UTIL2, "/cluster2");
StartMiniClusterOption option =
StartMiniClusterOption.builder().numMasters(2).numRegionServers(3).numDataNodes(3).build();
UTIL1.startMiniCluster(option);
UTIL2.startMiniCluster(option);
TableDescriptor td =
TableDescriptorBuilder.newBuilder(TABLE_NAME).setColumnFamily(ColumnFamilyDescriptorBuilder
.newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
UTIL1.getAdmin().createTable(td);
UTIL2.getAdmin().createTable(td);
FileSystem fs1 = UTIL1.getTestFileSystem();
FileSystem fs2 = UTIL2.getTestFileSystem();
REMOTE_WAL_DIR1 =
new Path(UTIL1.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs1.getUri(), fs1.getWorkingDirectory());
REMOTE_WAL_DIR2 =
new Path(UTIL2.getMiniHBaseCluster().getMaster().getMasterFileSystem().getWALRootDir(),
"remoteWALs").makeQualified(fs2.getUri(), fs2.getWorkingDirectory());
UTIL1.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(REMOTE_WAL_DIR2.toUri().toString()).build());
UTIL2.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey(UTIL1.getClusterKey())
.setReplicateAllUserTables(false)
.setTableCFsMap(ImmutableMap.of(TABLE_NAME, new ArrayList<>()))
.setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
}
private static void shutdown(HBaseTestingUtility util) throws Exception {
if (util.getHBaseCluster() == null) {
return;
}
Admin admin = util.getAdmin();
if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
if (admin
.getReplicationPeerSyncReplicationState(PEER_ID) != SyncReplicationState.DOWNGRADE_ACTIVE) {
admin.transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.DOWNGRADE_ACTIVE);
}
admin.removeReplicationPeer(PEER_ID);
}
util.shutdownMiniCluster();
}
@AfterClass
public static void tearDown() throws Exception {
shutdown(UTIL1);
shutdown(UTIL2);
ZK_UTIL.shutdownMiniZKCluster();
}
protected final void write(HBaseTestingUtility util, int start, int end) throws IOException {
try (Table table = util.getConnection().getTable(TABLE_NAME)) {
for (int i = start; i < end; i++) {
table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i)));
}
}
}
protected final void verify(HBaseTestingUtility util, int start, int end) throws IOException {
try (Table table = util.getConnection().getTable(TABLE_NAME)) {
for (int i = start; i < end; i++) {
assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
}
protected final void verifyThroughRegion(HBaseTestingUtility util, int start, int end)
throws IOException {
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
for (int i = start; i < end; i++) {
assertEquals(i, Bytes.toInt(region.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ)));
}
}
protected final void verifyNotReplicatedThroughRegion(HBaseTestingUtility util, int start,
int end) throws IOException {
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
for (int i = start; i < end; i++) {
assertTrue(region.get(new Get(Bytes.toBytes(i))).isEmpty());
}
}
protected final void waitUntilReplicationDone(HBaseTestingUtility util, int end)
throws Exception {
// The reject check is in RSRpcService so we can still read through HRegion
HRegion region = util.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
util.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !region.get(new Get(Bytes.toBytes(end - 1))).isEmpty();
}
@Override
public String explainFailure() throws Exception {
return "Replication has not been catched up yet";
}
});
}
protected final void writeAndVerifyReplication(HBaseTestingUtility util1,
HBaseTestingUtility util2, int start, int end) throws Exception {
write(util1, start, end);
waitUntilReplicationDone(util2, end);
verifyThroughRegion(util2, start, end);
}
protected final Path getRemoteWALDir(MasterFileSystem mfs, String peerId) {
Path remoteWALDir = new Path(mfs.getWALRootDir(), ReplicationUtils.REMOTE_WAL_DIR_NAME);
return getRemoteWALDir(remoteWALDir, peerId);
}
protected final Path getRemoteWALDir(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId);
}
protected final Path getReplayRemoteWALs(Path remoteWALDir, String peerId) {
return new Path(remoteWALDir, peerId + "-replay");
}
protected final void verifyRemovedPeer(String peerId, Path remoteWALDir,
HBaseTestingUtility utility) throws Exception {
ReplicationPeerStorage rps = ReplicationStorageFactory
.getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
try {
rps.getPeerSyncReplicationState(peerId);
fail("Should throw exception when get the sync replication state of a removed peer.");
} catch (ReplicationException e) {
// ignore.
}
try {
rps.getPeerNewSyncReplicationState(peerId);
fail("Should throw exception when get the new sync replication state of a removed peer");
} catch (ReplicationException e) {
// ignore.
}
try (FileSystem fs = utility.getTestFileSystem()) {
assertFalse(fs.exists(getRemoteWALDir(remoteWALDir, peerId)));
assertFalse(fs.exists(getReplayRemoteWALs(remoteWALDir, peerId)));
}
}
private void assertRejection(Throwable error) {
assertThat(error, instanceOf(DoNotRetryIOException.class));
assertTrue(error.getMessage().contains("Reject to apply to sink cluster"));
assertTrue(error.getMessage().contains(TABLE_NAME.toString()));
}
protected final void verifyReplicationRequestRejection(HBaseTestingUtility utility,
boolean expectedRejection) throws Exception {
HRegionServer regionServer = utility.getRSForFirstRegionInTable(TABLE_NAME);
AsyncClusterConnection connection = regionServer.getAsyncClusterConnection();
Entry[] entries = new Entry[10];
for (int i = 0; i < entries.length; i++) {
entries[i] =
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
} else {
try {
ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
} catch (DoNotRetryIOException e) {
assertRejection(e);
}
}
}
protected final void waitUntilDeleted(HBaseTestingUtility util, Path remoteWAL) throws Exception {
MasterFileSystem mfs = util.getMiniHBaseCluster().getMaster().getMasterFileSystem();
util.waitFor(30000, new ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return !mfs.getWALFileSystem().exists(remoteWAL);
}
@Override
public String explainFailure() throws Exception {
return remoteWAL + " has not been deleted yet";
}
});
}
}