blob: ee1ae5f380d520d75c9620b5cb48457049f33271 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/**
* Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
* class contains lower level tests using callables.
*/
@Category({ReplicationTests.class, MediumTests.class})
public class TestRegionReplicaReplicationEndpointNoMaster {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
private static final int NB_SERVERS = 2;
private static TableName tableName = TableName.valueOf(
TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
private static Table table;
private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
private static HRegionServer rs0;
private static HRegionServer rs1;
private static RegionInfo hriPrimary;
private static RegionInfo hriSecondary;
private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
private static final byte[] f = HConstants.CATALOG_FAMILY;
@BeforeClass
public static void beforeClass() throws Exception {
Configuration conf = HTU.getConfiguration();
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
// install WALObserver coprocessor for tests
String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
if (walCoprocs == null) {
walCoprocs = WALEditCopro.class.getName();
} else {
walCoprocs += "," + WALEditCopro.class.getName();
}
HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
walCoprocs);
StartMiniClusterOption option = StartMiniClusterOption.builder().numAlwaysStandByMasters(1).
numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
HTU.startMiniCluster(option);
// Create table then get the single region for our new table.
TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
table = HTU.createTable(htd, new byte[][]{f}, null);
try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
hriPrimary = locator.getRegionLocation(row, false).getRegion();
}
// mock a secondary region info to open
hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
// No master
TestRegionServerNoMaster.stopMasterAndAssignMeta(HTU);
rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
}
@AfterClass
public static void afterClass() throws Exception {
table.close();
HTU.shutdownMiniCluster();
}
@Before
public void before() throws Exception{
entries.clear();
}
@After
public void after() throws Exception {
}
static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
public static class WALEditCopro implements WALCoprocessor, WALObserver {
public WALEditCopro() {
entries.clear();
}
@Override
public Optional<WALObserver> getWALObserver() {
return Optional.of(this);
}
@Override
public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
// only keep primary region's edits
if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
// Presume type is a WALKeyImpl
entries.add(new Entry((WALKeyImpl)logKey, logEdit));
}
}
}
@Test
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
}
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
}
private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
throws IOException, ExecutionException, InterruptedException {
Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
connection
.replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
.get();
}
}
@Test
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
try (AsyncClusterConnection conn = ClusterConnectionFactory
.createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
openRegion(HTU, rs0, hriSecondary);
// load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
replicateUsingCallable(conn, entries);
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
// move the secondary region from RS0 to RS1
closeRegion(HTU, rs0, hriSecondary);
openRegion(HTU, rs1, hriSecondary);
// replicate the new data
replicateUsingCallable(conn, entries);
region = rs1.getRegion(hriSecondary.getEncodedName());
// verify the new data. old data may or may not be there
HTU.verifyNumericRows(region, f, 1000, 2000);
HTU.deleteNumericRows(table, f, 0, 2000);
closeRegion(HTU, rs1, hriSecondary);
}
}
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
when(context.getServer()).thenReturn(rs0);
when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
//load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
// replay the edits to the secondary using replay callable
final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
replicator.stop();
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
}
}