| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.test; |
| |
| import com.google.common.base.Joiner; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.IntegrationTestingUtility; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.hbase.client.ConnectionFactory; |
| import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; |
| import org.apache.hadoop.hbase.client.Admin; |
| import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.UUID; |
| |
| |
| /** |
| * This is an integration test for replication. It is derived off |
| * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} that creates a large circular |
| * linked list in one cluster and verifies that the data is correct in a sink cluster. The test |
| * handles creating the tables and schema and setting up the replication. |
| */ |
| public class IntegrationTestReplication extends IntegrationTestBigLinkedList { |
| protected String sourceClusterIdString; |
| protected String sinkClusterIdString; |
| protected int numIterations; |
| protected int numMappers; |
| protected long numNodes; |
| protected String outputDir; |
| protected int numReducers; |
| protected int generateVerifyGap; |
| protected Integer width; |
| protected Integer wrapMultiplier; |
| protected boolean noReplicationSetup = false; |
| |
| private final String SOURCE_CLUSTER_OPT = "sourceCluster"; |
| private final String DEST_CLUSTER_OPT = "destCluster"; |
| private final String ITERATIONS_OPT = "iterations"; |
| private final String NUM_MAPPERS_OPT = "numMappers"; |
| private final String OUTPUT_DIR_OPT = "outputDir"; |
| private final String NUM_REDUCERS_OPT = "numReducers"; |
| private final String NO_REPLICATION_SETUP_OPT = "noReplicationSetup"; |
| |
| /** |
| * The gap (in seconds) from when data is finished being generated at the source |
| * to when it can be verified. This is the replication lag we are willing to tolerate |
| */ |
| private final String GENERATE_VERIFY_GAP_OPT = "generateVerifyGap"; |
| |
| /** |
| * The width of the linked list. |
| * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details |
| */ |
| private final String WIDTH_OPT = "width"; |
| |
| /** |
| * The number of rows after which the linked list points to the first row. |
| * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details |
| */ |
| private final String WRAP_MULTIPLIER_OPT = "wrapMultiplier"; |
| |
| /** |
| * The number of nodes in the test setup. This has to be a multiple of WRAP_MULTIPLIER * WIDTH |
| * in order to ensure that the linked list can is complete. |
| * See {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} for more details |
| */ |
| private final String NUM_NODES_OPT = "numNodes"; |
| |
| private final int DEFAULT_NUM_MAPPERS = 1; |
| private final int DEFAULT_NUM_REDUCERS = 1; |
| private final int DEFAULT_NUM_ITERATIONS = 1; |
| private final int DEFAULT_GENERATE_VERIFY_GAP = 60; |
| private final int DEFAULT_WIDTH = 1000000; |
| private final int DEFAULT_WRAP_MULTIPLIER = 25; |
| private final int DEFAULT_NUM_NODES = DEFAULT_WIDTH * DEFAULT_WRAP_MULTIPLIER; |
| |
| /** |
| * Wrapper around an HBase ClusterID allowing us |
| * to get admin connections and configurations for it |
| */ |
| protected class ClusterID { |
| private final Configuration configuration; |
| private Connection connection = null; |
| |
| /** |
| * This creates a new ClusterID wrapper that will automatically build connections and |
| * configurations to be able to talk to the specified cluster |
| * |
| * @param base the base configuration that this class will add to |
| * @param key the cluster key in the form of zk_quorum:zk_port:zk_parent_node |
| */ |
| public ClusterID(Configuration base, |
| String key) { |
| configuration = new Configuration(base); |
| String[] parts = key.split(":"); |
| configuration.set(HConstants.ZOOKEEPER_QUORUM, parts[0]); |
| configuration.set(HConstants.ZOOKEEPER_CLIENT_PORT, parts[1]); |
| configuration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, parts[2]); |
| } |
| |
| @Override |
| public String toString() { |
| return Joiner.on(":").join(configuration.get(HConstants.ZOOKEEPER_QUORUM), |
| configuration.get(HConstants.ZOOKEEPER_CLIENT_PORT), |
| configuration.get(HConstants.ZOOKEEPER_ZNODE_PARENT)); |
| } |
| |
| public Configuration getConfiguration() { |
| return this.configuration; |
| } |
| |
| public Connection getConnection() throws Exception { |
| if (this.connection == null) { |
| this.connection = ConnectionFactory.createConnection(this.configuration); |
| } |
| return this.connection; |
| } |
| |
| public void closeConnection() throws Exception { |
| this.connection.close(); |
| this.connection = null; |
| } |
| |
| public boolean equals(ClusterID other) { |
| return this.toString().equalsIgnoreCase(other.toString()); |
| } |
| } |
| |
| /** |
| * The main runner loop for the test. It uses |
| * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList} |
| * for the generation and verification of the linked list. It is heavily based on |
| * {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Loop} |
| */ |
| protected class VerifyReplicationLoop extends Configured implements Tool { |
| private final Log LOG = LogFactory.getLog(VerifyReplicationLoop.class); |
| protected ClusterID source; |
| protected ClusterID sink; |
| |
| IntegrationTestBigLinkedList integrationTestBigLinkedList; |
| |
| /** |
| * This tears down any tables that existed from before and rebuilds the tables and schemas on |
| * the source cluster. It then sets up replication from the source to the sink cluster by using |
| * the {@link org.apache.hadoop.hbase.client.replication.ReplicationAdmin} |
| * connection. |
| * |
| * @throws Exception |
| */ |
| protected void setupTablesAndReplication() throws Exception { |
| TableName tableName = getTableName(source.getConfiguration()); |
| |
| ClusterID[] clusters = {source, sink}; |
| |
| // delete any old tables in the source and sink |
| for (ClusterID cluster : clusters) { |
| Admin admin = cluster.getConnection().getAdmin(); |
| |
| if (admin.tableExists(tableName)) { |
| if (admin.isTableEnabled(tableName)) { |
| admin.disableTable(tableName); |
| } |
| |
| /** |
| * TODO: This is a work around on a replication bug (HBASE-13416) |
| * When we recreate a table against that has recently been |
| * deleted, the contents of the logs are replayed even though |
| * they should not. This ensures that we flush the logs |
| * before the table gets deleted. Eventually the bug should be |
| * fixed and this should be removed. |
| */ |
| Set<ServerName> regionServers = new TreeSet<>(); |
| for (HRegionLocation rl : |
| cluster.getConnection().getRegionLocator(tableName).getAllRegionLocations()) { |
| regionServers.add(rl.getServerName()); |
| } |
| |
| for (ServerName server : regionServers) { |
| source.getConnection().getAdmin().rollWALWriter(server); |
| } |
| |
| admin.deleteTable(tableName); |
| } |
| } |
| |
| // create the schema |
| Generator generator = new Generator(); |
| generator.setConf(source.getConfiguration()); |
| generator.createSchema(); |
| |
| // setup the replication on the source |
| if (!source.equals(sink)) { |
| ReplicationAdmin replicationAdmin = new ReplicationAdmin(source.getConfiguration()); |
| // remove any old replication peers |
| for (String oldPeer : replicationAdmin.listPeerConfigs().keySet()) { |
| replicationAdmin.removePeer(oldPeer); |
| } |
| |
| // set the sink to be the target |
| ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); |
| peerConfig.setClusterKey(sink.toString()); |
| |
| // set the test table to be the table to replicate |
| HashMap<TableName, ArrayList<String>> toReplicate = new HashMap<>(); |
| toReplicate.put(tableName, new ArrayList<String>(0)); |
| |
| replicationAdmin.addPeer("TestPeer", peerConfig, toReplicate); |
| |
| replicationAdmin.enableTableRep(tableName); |
| replicationAdmin.close(); |
| } |
| |
| for (ClusterID cluster : clusters) { |
| cluster.closeConnection(); |
| } |
| } |
| |
| protected void waitForReplication() throws Exception { |
| // TODO: we shouldn't be sleeping here. It would be better to query the region servers |
| // and wait for them to report 0 replication lag. |
| Thread.sleep(generateVerifyGap * 1000); |
| } |
| |
| /** |
| * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Generator} in the |
| * source cluster. This assumes that the tables have been setup via setupTablesAndReplication. |
| * |
| * @throws Exception |
| */ |
| protected void runGenerator() throws Exception { |
| Path outputPath = new Path(outputDir); |
| UUID uuid = UUID.randomUUID(); //create a random UUID. |
| Path generatorOutput = new Path(outputPath, uuid.toString()); |
| |
| Generator generator = new Generator(); |
| generator.setConf(source.getConfiguration()); |
| |
| int retCode = generator.run(numMappers, numNodes, generatorOutput, width, wrapMultiplier); |
| if (retCode > 0) { |
| throw new RuntimeException("Generator failed with return code: " + retCode); |
| } |
| } |
| |
| |
| /** |
| * Run the {@link org.apache.hadoop.hbase.test.IntegrationTestBigLinkedList.Verify} |
| * in the sink cluster. If replication is working properly the data written at the source |
| * cluster should be available in the sink cluster after a reasonable gap |
| * |
| * @param expectedNumNodes the number of nodes we are expecting to see in the sink cluster |
| * @throws Exception |
| */ |
| protected void runVerify(long expectedNumNodes) throws Exception { |
| Path outputPath = new Path(outputDir); |
| UUID uuid = UUID.randomUUID(); //create a random UUID. |
| Path iterationOutput = new Path(outputPath, uuid.toString()); |
| |
| Verify verify = new Verify(); |
| verify.setConf(sink.getConfiguration()); |
| |
| int retCode = verify.run(iterationOutput, numReducers); |
| if (retCode > 0) { |
| throw new RuntimeException("Verify.run failed with return code: " + retCode); |
| } |
| |
| if (!verify.verify(expectedNumNodes)) { |
| throw new RuntimeException("Verify.verify failed"); |
| } |
| |
| LOG.info("Verify finished with success. Total nodes=" + expectedNumNodes); |
| } |
| |
| /** |
| * The main test runner |
| * |
| * This test has 4 steps: |
| * 1: setupTablesAndReplication |
| * 2: generate the data into the source cluster |
| * 3: wait for replication to propagate |
| * 4: verify that the data is available in the sink cluster |
| * |
| * @param args should be empty |
| * @return 0 on success |
| * @throws Exception on an error |
| */ |
| @Override |
| public int run(String[] args) throws Exception { |
| source = new ClusterID(getConf(), sourceClusterIdString); |
| sink = new ClusterID(getConf(), sinkClusterIdString); |
| |
| if (!noReplicationSetup) { |
| setupTablesAndReplication(); |
| } |
| int expectedNumNodes = 0; |
| for (int i = 0; i < numIterations; i++) { |
| LOG.info("Starting iteration = " + i); |
| |
| expectedNumNodes += numMappers * numNodes; |
| |
| runGenerator(); |
| waitForReplication(); |
| runVerify(expectedNumNodes); |
| } |
| |
| /** |
| * we are always returning 0 because exceptions are thrown when there is an error |
| * in the verification step. |
| */ |
| return 0; |
| } |
| } |
| |
| @Override |
| protected void addOptions() { |
| super.addOptions(); |
| addRequiredOptWithArg("s", SOURCE_CLUSTER_OPT, |
| "Cluster ID of the source cluster (e.g. localhost:2181:/hbase)"); |
| addRequiredOptWithArg("r", DEST_CLUSTER_OPT, |
| "Cluster ID of the sink cluster (e.g. localhost:2182:/hbase)"); |
| addRequiredOptWithArg("d", OUTPUT_DIR_OPT, |
| "Temporary directory where to write keys for the test"); |
| |
| addOptWithArg("nm", NUM_MAPPERS_OPT, |
| "Number of mappers (default: " + DEFAULT_NUM_MAPPERS + ")"); |
| addOptWithArg("nr", NUM_REDUCERS_OPT, |
| "Number of reducers (default: " + DEFAULT_NUM_MAPPERS + ")"); |
| addOptNoArg("nrs", NO_REPLICATION_SETUP_OPT, |
| "Don't setup tables or configure replication before starting test"); |
| addOptWithArg("n", NUM_NODES_OPT, |
| "Number of nodes. This should be a multiple of width * wrapMultiplier." + |
| " (default: " + DEFAULT_NUM_NODES + ")"); |
| addOptWithArg("i", ITERATIONS_OPT, "Number of iterations to run (default: " + |
| DEFAULT_NUM_ITERATIONS + ")"); |
| addOptWithArg("t", GENERATE_VERIFY_GAP_OPT, |
| "Gap between generate and verify steps in seconds (default: " + |
| DEFAULT_GENERATE_VERIFY_GAP + ")"); |
| addOptWithArg("w", WIDTH_OPT, |
| "Width of the linked list chain (default: " + DEFAULT_WIDTH + ")"); |
| addOptWithArg("wm", WRAP_MULTIPLIER_OPT, "How many times to wrap around (default: " + |
| DEFAULT_WRAP_MULTIPLIER + ")"); |
| } |
| |
| @Override |
| protected void processOptions(CommandLine cmd) { |
| processBaseOptions(cmd); |
| |
| sourceClusterIdString = cmd.getOptionValue(SOURCE_CLUSTER_OPT); |
| sinkClusterIdString = cmd.getOptionValue(DEST_CLUSTER_OPT); |
| outputDir = cmd.getOptionValue(OUTPUT_DIR_OPT); |
| |
| /** This uses parseInt from {@link org.apache.hadoop.hbase.util.AbstractHBaseTool} */ |
| numMappers = parseInt(cmd.getOptionValue(NUM_MAPPERS_OPT, |
| Integer.toString(DEFAULT_NUM_MAPPERS)), |
| 1, Integer.MAX_VALUE); |
| numReducers = parseInt(cmd.getOptionValue(NUM_REDUCERS_OPT, |
| Integer.toString(DEFAULT_NUM_REDUCERS)), |
| 1, Integer.MAX_VALUE); |
| numNodes = parseInt(cmd.getOptionValue(NUM_NODES_OPT, Integer.toString(DEFAULT_NUM_NODES)), |
| 1, Integer.MAX_VALUE); |
| generateVerifyGap = parseInt(cmd.getOptionValue(GENERATE_VERIFY_GAP_OPT, |
| Integer.toString(DEFAULT_GENERATE_VERIFY_GAP)), |
| 1, Integer.MAX_VALUE); |
| numIterations = parseInt(cmd.getOptionValue(ITERATIONS_OPT, |
| Integer.toString(DEFAULT_NUM_ITERATIONS)), |
| 1, Integer.MAX_VALUE); |
| width = parseInt(cmd.getOptionValue(WIDTH_OPT, Integer.toString(DEFAULT_WIDTH)), |
| 1, Integer.MAX_VALUE); |
| wrapMultiplier = parseInt(cmd.getOptionValue(WRAP_MULTIPLIER_OPT, |
| Integer.toString(DEFAULT_WRAP_MULTIPLIER)), |
| 1, Integer.MAX_VALUE); |
| |
| if (cmd.hasOption(NO_REPLICATION_SETUP_OPT)) { |
| noReplicationSetup = true; |
| } |
| |
| if (numNodes % (width * wrapMultiplier) != 0) { |
| throw new RuntimeException("numNodes must be a multiple of width and wrap multiplier"); |
| } |
| } |
| |
| @Override |
| public int runTestFromCommandLine() throws Exception { |
| VerifyReplicationLoop tool = new VerifyReplicationLoop(); |
| tool.integrationTestBigLinkedList = this; |
| return ToolRunner.run(getConf(), tool, null); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| Configuration conf = HBaseConfiguration.create(); |
| IntegrationTestingUtility.setUseDistributedCluster(conf); |
| int ret = ToolRunner.run(conf, new IntegrationTestReplication(), args); |
| System.exit(ret); |
| } |
| } |