blob: 5c4fc9198ccb6ea4dafd6eec91e3b34c5f6455f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
private static final Logger LOG =
LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
@Override
protected void customizeClusterConf(Configuration conf) {
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
conf.setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
}
@Test
public void testSyncUpTool() throws Exception {
/**
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
* 'cf1' : replicated 'norep': not replicated
*/
setupReplication();
/**
* Prepare 24 random hfile ranges required for creating hfiles
*/
Iterator<String> randomHFileRangeListIterator = null;
Set<String> randomHFileRanges = new HashSet<>(24);
for (int i = 0; i < 24; i++) {
randomHFileRanges.add(UTIL1.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
randomHFileRangeListIterator = randomHFileRangeList.iterator();
/**
* at Master: t1_syncup: Load 50 rows into cf1, and 50 rows from other hdfs into cf1, and 3
* rows into norep t2_syncup: Load 100 rows into cf1, and 100 rows from other hdfs into cf1,
* and 3 rows into norep verify correctly replicated to slave
*/
loadAndReplicateHFiles(true, randomHFileRangeListIterator);
/**
* Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
* another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
* 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
* still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
* 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
* t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
* Slave
*/
mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
}
private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
throws Exception {
LOG.debug("mimicSyncUpAfterBulkLoad");
shutDownTargetHBaseCluster();
loadAndReplicateHFiles(false, randomHFileRangeListIterator);
int rowCount_ht1Source = countRows(ht1Source);
assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
rowCount_ht1Source);
int rowCount_ht2Source = countRows(ht2Source);
assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
rowCount_ht2Source);
shutDownSourceHBaseCluster();
restartTargetHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// Before sync up
int rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
int rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
// Run sync up tool
syncUp(UTIL1);
// After syun up
for (int i = 0; i < NB_RETRIES; i++) {
syncUp(UTIL1);
rowCountHt1TargetAtPeer1 = countRows(ht1TargetAtPeer1);
rowCountHt2TargetAtPeer1 = countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
// syncUP still failed. Let's look at the source in case anything wrong there
restartSourceHBaseCluster(1);
rowCount_ht1Source = countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
rowCount_ht2Source = countRows(ht2Source);
LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
rowCountHt2TargetAtPeer1);
}
if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
break;
} else {
LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i +
", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 +
" and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
}
private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
Iterator<String> randomHFileRangeListIterator) throws Exception {
LOG.debug("loadAndReplicateHFiles");
// Load 50 + 50 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 50);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source,
hfileRanges, 50);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
hfileRanges, 3);
// Load 100 + 100 + 3 hfiles to t2_syncup.
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 100);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadFromOtherHDFSAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source,
hfileRanges, 100);
hfileRanges =
new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
Bytes.toBytes(randomHFileRangeListIterator.next()) } };
loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
hfileRanges, 3);
if (verifyReplicationOnSlave) {
// ensure replication completed
wait(ht1TargetAtPeer1, countRows(ht1Source) - 3,
"t1_syncup has 103 rows on source, and 100 on slave1");
wait(ht2TargetAtPeer1, countRows(ht2Source) - 3,
"t2_syncup has 203 rows on source, and 200 on slave1");
}
}
private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
Path dir = UTIL1.getDataTestDirOnTestFS(testName);
FileSystem fs = UTIL1.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}
final TableName tableName = source.getName();
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
loader.bulkLoad(tableName, dir);
}
private void loadFromOtherHDFSAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
Path dir = UTIL2.getDataTestDirOnTestFS(testName);
FileSystem fs = UTIL2.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
HFileTestUtil.createHFile(UTIL2.getConfiguration(), fs,
new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}
final TableName tableName = source.getName();
BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
loader.bulkLoad(tableName, dir);
}
private void wait(Table target, int expectedCount, String msg)
throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
int rowCountHt2TargetAtPeer1 = countRows(target);
if (i == NB_RETRIES - 1) {
assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
}
if (expectedCount == rowCountHt2TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
}
}
}