blob: 134ea4771263f1ed0faeceeefa2966bf5f86f802 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@Category({ ReplicationTests.class, SmallTests.class})
public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class);
private static final String PEER1_CLUSTER_ID = "peer1";
private static final String PEER2_CLUSTER_ID = "peer2";
private static final String REPLICATE_NAMESPACE = "replicate_ns";
private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
private static final TableName REPLICATE_TABLE =
TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
private static final TableName NO_REPLICATE_TABLE =
TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
private static final byte[] CF_A = Bytes.toBytes("cfa");
private static final byte[] CF_B = Bytes.toBytes("cfb");
private byte[] row = Bytes.toBytes("r1");
private byte[] qualifier = Bytes.toBytes("q1");
private byte[] value = Bytes.toBytes("v1");
@ClassRule
public static TemporaryFolder testFolder = new TemporaryFolder();
private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
private static Admin admin1;
private static Admin admin2;
private static ReplicationQueueStorage queueStorage;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
TestReplicationBase.setUpBeforeClass();
admin1 = UTIL1.getConnection().getAdmin();
admin2 = UTIL2.getConnection().getAdmin();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
UTIL1.getConfiguration());
admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
}
protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
config.writeXml(new FileOutputStream(sourceConfigFile));
config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
}
@Before
public void setUp() throws Exception {
for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
admin1.removeReplicationPeer(peer.getPeerId());
}
}
@After
public void teardown() throws Exception {
for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
admin1.removeReplicationPeer(peer.getPeerId());
}
for (TableName tableName : admin1.listTableNames()) {
UTIL1.deleteTable(tableName);
}
for (TableName tableName : admin2.listTableNames()) {
UTIL2.deleteTable(tableName);
}
}
@Test
public void testWhenExcludeCF() throws Exception {
// Create table in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
// Add peer, setReplicateAllUserTables true, but exclude CF_B.
Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCFs)
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the CF that is not replicated.
bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
@Test
public void testWhenExcludeTable() throws Exception {
// Create 2 tables in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A);
createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
// Add peer, setReplicateAllUserTables true, but exclude one table.
Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
excludeTableCFs.put(NO_REPLICATE_TABLE, null);
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeTableCFsMap(excludeTableCFs)
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the table that is not replicated.
bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
@Test
public void testWhenExcludeNamespace() throws Exception {
// Create 2 tables in source and remote clusters.
createTableOnClusters(REPLICATE_TABLE, CF_A);
createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
// Add peer, setReplicateAllUserTables true, but exclude one namespace.
ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
.setClusterKey(UTIL2.getClusterKey())
.setReplicateAllUserTables(true)
.setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE))
.build();
admin1.addReplicationPeer(PEER_ID2, peerConfig);
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
assertEquals(0, queueStorage.getAllHFileRefs().size());
// Bulk load data into the table of the namespace that is not replicated.
byte[] row = Bytes.toBytes("001");
byte[] value = Bytes.toBytes("v1");
bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
Threads.sleep(1000);
// Cannot get data from remote cluster
Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
Result result = table2.get(new Get(row));
assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
// The extra HFile is never added to the HFileRefs
assertEquals(0, queueStorage.getAllHFileRefs().size());
}
protected void bulkLoadOnCluster(TableName tableName, byte[] family)
throws Exception {
String bulkLoadFilePath = createHFileForFamilies(family);
copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
}
private String createHFileForFamilies(byte[] family) throws IOException {
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
cellBuilder.setRow(row)
.setFamily(family)
.setQualifier(qualifier)
.setValue(value)
.setType(Cell.Type.Put);
HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
File hFileLocation = testFolder.newFile();
FSDataOutputStream out =
new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
try {
hFileFactory.withOutputStream(out);
hFileFactory.withFileContext(new HFileContextBuilder().build());
HFile.Writer writer = hFileFactory.create();
try {
writer.append(new KeyValue(cellBuilder.build()));
} finally {
writer.close();
}
} finally {
out.close();
}
return hFileLocation.getAbsoluteFile().getAbsolutePath();
}
private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
throws Exception {
Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
cluster.getFileSystem().mkdirs(bulkLoadDir);
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}
private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
for (byte[] cf : cfs) {
builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
}
TableDescriptor td = builder.build();
admin1.createTable(td);
admin2.createTable(td);
}
}