| /** |
| * Copyright 2014 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.Modifier; |
| import java.net.InetAddress; |
| import java.net.ServerSocket; |
| import java.net.Socket; |
| import java.net.UnknownHostException; |
| import java.security.MessageDigest; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableSet; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.commons.logging.impl.Jdk14Logger; |
| import org.apache.commons.logging.impl.Log4JLogger; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.client.Delete; |
| import org.apache.hadoop.hbase.client.Get; |
| import org.apache.hadoop.hbase.client.HBaseAdmin; |
| import org.apache.hadoop.hbase.client.HConnection; |
| import org.apache.hadoop.hbase.client.HConnectionManager; |
| import org.apache.hadoop.hbase.client.HTable; |
| import org.apache.hadoop.hbase.client.HTableAsync; |
| import org.apache.hadoop.hbase.client.MetaScanner; |
| import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; |
| import org.apache.hadoop.hbase.client.NoServerForRegionException; |
| import org.apache.hadoop.hbase.client.PreemptiveFastFailException; |
| import org.apache.hadoop.hbase.client.Put; |
| import org.apache.hadoop.hbase.client.Result; |
| import org.apache.hadoop.hbase.client.ResultScanner; |
| import org.apache.hadoop.hbase.client.RetriesExhaustedException; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.client.ServerConnectionManager; |
| import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; |
| import org.apache.hadoop.hbase.io.hfile.BlockCache; |
| import org.apache.hadoop.hbase.io.hfile.CacheConfig; |
| import org.apache.hadoop.hbase.io.hfile.Compression; |
| import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; |
| import org.apache.hadoop.hbase.ipc.HRegionInterface; |
| import org.apache.hadoop.hbase.master.AssignmentPlan; |
| import org.apache.hadoop.hbase.master.HMaster; |
| import org.apache.hadoop.hbase.master.RegionPlacement; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.regionserver.InternalScanner; |
| import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; |
| import org.apache.hadoop.hbase.regionserver.Store; |
| import org.apache.hadoop.hbase.regionserver.StoreFile; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.apache.hadoop.hbase.util.RegionSplitter; |
| import org.apache.hadoop.hbase.util.StringBytes; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hbase.util.Writables; |
| import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper; |
| import org.apache.hadoop.hdfs.DFSClient; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.hdfs.MiniDFSCluster; |
| import org.apache.hadoop.hdfs.server.datanode.DataNode; |
| import org.apache.hadoop.hdfs.server.namenode.NameNode; |
| import org.apache.hadoop.mapred.MiniMRCluster; |
| import org.apache.hadoop.mapred.TaskLog; |
| import org.apache.hadoop.security.UnixUserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.junit.Assert; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * Facility for testing HBase. Added as tool to abet junit4 testing. Replaces |
| * old HBaseTestCase and HBaseCluserTestCase functionality. |
| * Create an instance and keep it around doing HBase testing. This class is |
| * meant to be your one-stop shop for anything you might need testing. Manages |
| * one cluster at a time only. Depends on log4j being on classpath and |
| * hbase-site.xml for logging and test-run configuration. It does not set |
| * logging levels nor make changes to configuration parameters. |
| */ |
| public class HBaseTestingUtility { |
| private final static Log LOG = LogFactory.getLog(HBaseTestingUtility.class); |
| private final Configuration conf; |
| private final CacheConfig cacheConf; |
| private MiniZooKeeperCluster zkCluster = null; |
| private static int USERNAME_SUFFIX = 0; |
| |
| /** |
| * The default number of regions per regionserver when creating a pre-split |
| * table. |
| */ |
| private static int DEFAULT_REGIONS_PER_SERVER = 5; |
| |
| private MiniDFSCluster dfsCluster = null; |
| private MiniHBaseCluster hbaseCluster = null; |
| private MiniMRCluster mrCluster = null; |
| |
| /** The root directory for all mini-cluster test data for this testing utility instance. */ |
| private File clusterTestBuildDir = null; |
| |
| /** If there is a mini cluster running for this testing utility instance. */ |
| private boolean miniClusterRunning; |
| |
| private HBaseAdmin hbaseAdmin = null; |
| |
| private String hadoopLogDir; |
| |
| /** |
| * System property key to get test directory value. |
| */ |
| public static final String TEST_DIRECTORY_KEY = "test.build.data"; |
| |
| /** |
| * Default parent directory for test output. |
| */ |
| public static final String DEFAULT_TEST_DIRECTORY = "target/build/data"; |
| |
| /** Filesystem URI used for map-reduce mini-cluster setup */ |
| private static String fsURI; |
| |
| /** A set of ports that have been claimed using {@link #randomFreePort()}. */ |
| private static final Set<Integer> takenRandomPorts = new HashSet<Integer>(); |
| |
| /** Compression algorithms to use in parameterized JUnit 4 tests */ |
| public static final List<Object[]> COMPRESSION_ALGORITHMS_PARAMETERIZED = |
| Arrays.asList(new Object[][] { |
| { Compression.Algorithm.NONE }, |
| { Compression.Algorithm.GZ } |
| }); |
| |
| /** This is for unit tests parameterized with a single boolean. */ |
| public static final List<Object[]> BOOLEAN_PARAMETERIZED = |
| Arrays.asList(new Object[][] { |
| { new Boolean(false) }, |
| { new Boolean(true) } |
| }); |
| |
| /** Compression algorithms to use in testing */ |
| public static final Compression.Algorithm[] COMPRESSION_ALGORITHMS = |
| new Compression.Algorithm[] { |
| Compression.Algorithm.NONE, Compression.Algorithm.GZ |
| }; |
| |
| /** |
| * Create all combinations of Bloom filters and compression algorithms for |
| * testing. |
| */ |
| private static List<Object[]> bloomAndCompressionCombinations() { |
| List<Object[]> configurations = new ArrayList<Object[]>(); |
| for (Compression.Algorithm comprAlgo : |
| HBaseTestingUtility.COMPRESSION_ALGORITHMS) { |
| for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) { |
| configurations.add(new Object[] { comprAlgo, bloomType }); |
| } |
| } |
| return Collections.unmodifiableList(configurations); |
| } |
| |
| public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS = |
| bloomAndCompressionCombinations(); |
| |
| public HBaseTestingUtility() { |
| this(HBaseConfiguration.create()); |
| } |
| |
| public HBaseTestingUtility(Configuration conf) { |
| this.conf = conf; |
| cacheConf = new CacheConfig(conf); |
| } |
| |
| /** |
| * @return Instance of Configuration. |
| */ |
| public Configuration getConfiguration() { |
| return this.conf; |
| } |
| |
| /** |
| * @return Where to write test data on local filesystem; usually |
| * {@link #DEFAULT_TEST_DIRECTORY} |
| * @see #setupClusterTestBuildDir() |
| */ |
| public Path getTestDir() { |
| setupClusterTestBuildDir(); |
| return new Path(clusterTestBuildDir.getPath()); |
| } |
| |
| /** |
| * @param subdirName |
| * @return Path to a subdirectory named <code>subdirName</code> under |
| * {@link #getTestDir()}. |
| * @see #setupClusterTestBuildDir() |
| */ |
| public Path getTestDir(final String subdirName) { |
| return new Path(getTestDir(), subdirName); |
| } |
| |
| /** |
| * Home our cluster in a dir under target/test. Give it a random name |
| * so can have many concurrent clusters running if we need to. Need to |
| * amend the test.build.data System property. Its what minidfscluster bases |
| * it data dir on. Moding a System property is not the way to do concurrent |
| * instances -- another instance could grab the temporary |
| * value unintentionally -- but not anything can do about it at moment; |
| * single instance only is how the minidfscluster works. |
| * @return The calculated cluster test build directory. |
| */ |
| public void setupClusterTestBuildDir() { |
| if (clusterTestBuildDir != null) { |
| return; |
| } |
| |
| String randomStr = UUID.randomUUID().toString(); |
| String dirStr = new Path(DEFAULT_TEST_DIRECTORY, randomStr).toString(); |
| clusterTestBuildDir = new File(dirStr).getAbsoluteFile(); |
| clusterTestBuildDir.mkdirs(); |
| clusterTestBuildDir.deleteOnExit(); |
| conf.set(TEST_DIRECTORY_KEY, clusterTestBuildDir.getPath()); |
| LOG.info("Created new mini-cluster data directory: " + clusterTestBuildDir); |
| } |
| |
| /** |
| * Start a minidfscluster. |
| * Can only create one. |
| * @param dir Where to home your dfs cluster. |
| * @param servers How many DNs to start. |
| * @see {@link #shutdownMiniDFSCluster()} |
| * @return The mini dfs cluster created. |
| */ |
| public MiniDFSCluster startMiniDFSCluster(int servers) throws IOException { |
| createDirsAndSetProperties(); |
| this.dfsCluster = new MiniDFSCluster.Builder(this.conf).nameNodePort(0) |
| .numDataNodes(servers).format(true).manageDataDfsDirs(true) |
| .manageNameDfsDirs(true).build(); |
| return this.dfsCluster; |
| } |
| |
| public MiniDFSCluster startMiniDFSClusterForTestHLog(int namenodePort) throws IOException { |
| createDirsAndSetProperties(); |
| this.dfsCluster = new MiniDFSCluster.Builder(this.conf) |
| .nameNodePort(namenodePort).numDataNodes(5).format(false) |
| .manageDataDfsDirs(true).manageNameDfsDirs(true).build(); |
| return dfsCluster; |
| } |
| |
| /** This is used before starting HDFS and map-reduce mini-clusters */ |
| private void createDirsAndSetProperties() { |
| setupClusterTestBuildDir(); |
| System.setProperty(TEST_DIRECTORY_KEY, clusterTestBuildDir.getPath()); |
| createDirAndSetProperty("cache_data", "test.cache.data"); |
| createDirAndSetProperty("hadoop_tmp", "hadoop.tmp.dir"); |
| hadoopLogDir = createDirAndSetProperty("hadoop_logs", "hadoop.log.dir"); |
| createDirAndSetProperty("mapred_output", "mapred.output.dir"); |
| createDirAndSetProperty("mapred_local", "mapred.local.dir"); |
| createDirAndSetProperty("mapred_system", "mapred.system.dir"); |
| createDirAndSetProperty("fsimage", "dfs.name.dir"); |
| createDirAndSetProperty("fsedit", "dfs.name.edits.dir"); |
| } |
| |
| /** |
| * Shuts down instance created by call to {@link #startMiniDFSCluster(int)} |
| * or does nothing. |
| * @throws IOException |
| * @throws Exception |
| */ |
| public void shutdownMiniDFSCluster() throws IOException { |
| if (this.dfsCluster != null) { |
| // The below throws an exception per dn, AsynchronousCloseException. |
| this.dfsCluster.shutdown(); |
| dfsCluster = null; |
| } |
| } |
| |
| /** |
| * Call this if you only want a zk cluster. |
| * @see #startMiniZKCluster() if you want zk + dfs + hbase mini cluster. |
| * @throws Exception |
| * @see #shutdownMiniZKCluster() |
| * @return zk cluster started. |
| */ |
| public MiniZooKeeperCluster startMiniZKCluster() throws Exception { |
| setupClusterTestBuildDir(); |
| return startMiniZKCluster(clusterTestBuildDir); |
| } |
| |
| private MiniZooKeeperCluster startMiniZKCluster(final File dir) |
| throws IOException, InterruptedException { |
| if (this.zkCluster != null) { |
| throw new IOException("Cluster already running at " + dir); |
| } |
| this.zkCluster = new MiniZooKeeperCluster(); |
| int clientPort = this.zkCluster.startup(dir); |
| this.conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, |
| Integer.toString(clientPort)); |
| return this.zkCluster; |
| } |
| |
| /** |
| * Shuts down zk cluster created by call to {@link #startMiniZKCluster(File)} |
| * or does nothing. |
| * @throws IOException |
| * @see #startMiniZKCluster() |
| */ |
| public void shutdownMiniZKCluster() throws IOException { |
| if (this.zkCluster != null) { |
| this.zkCluster.shutdown(); |
| zkCluster = null; |
| } |
| } |
| |
| /** |
| * Start up a minicluster of hbase, dfs, and zookeeper. |
| * @throws Exception |
| * @return Mini hbase cluster instance created. |
| * @see {@link #shutdownMiniDFSCluster()} |
| */ |
| public MiniHBaseCluster startMiniCluster() throws Exception { |
| return startMiniCluster(1, 1); |
| } |
| |
| /** |
| * Start up a minicluster of hbase, optionally dfs, and zookeeper. |
| * Modifies Configuration. Homes the cluster data directory under a random |
| * subdirectory in a directory under System property test.build.data. |
| * Directory is cleaned up on exit. |
| * @param numSlaves Number of slaves to start up. We'll start this many |
| * datanodes and regionservers. If numSlaves is > 1, then make sure |
| * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise |
| * bind errors. |
| * @throws Exception |
| * @see {@link #shutdownMiniCluster()} |
| * @return Mini hbase cluster instance created. |
| */ |
| public MiniHBaseCluster startMiniCluster(final int numSlaves) |
| throws IOException, InterruptedException { |
| return startMiniCluster(1, numSlaves); |
| } |
| |
| public MiniHBaseCluster startMiniCluster(final int numMasters, |
| final int numSlaves) throws IOException, InterruptedException { |
| return startMiniCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); |
| } |
| |
| public static final String FS_TYPE_KEY = "mini.cluster.fs.type"; |
| public static final String FS_TYPE_DFS = "dfs"; |
| public static final String FS_TYPE_LFS = "lfs"; |
| public static final String MINICLUSTER_FS_TYPE_DEFAULT = FS_TYPE_DFS; |
| |
| /** |
| * Start up a minicluster of hbase, optionally dfs, and zookeeper. |
| * Modifies Configuration. Homes the cluster data directory under a random |
| * subdirectory in a directory under System property test.build.data. |
| * Directory is cleaned up on exit. |
| * @param numMasters Number of masters to start up. We'll start this many |
| * hbase masters. If numMasters > 1, you can find the active/primary master |
| * with {@link MiniHBaseCluster#getMaster()}. |
| * @param numSlaves Number of slave servers to start up. We'll start this |
| * many datanodes and regionservers. If servers is > 1, then make sure |
| * hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise |
| * bind errors. |
| * @throws Exception |
| * @see {@link #shutdownMiniCluster()} |
| * @return Mini hbase cluster instance created. |
| */ |
| public MiniHBaseCluster startMiniCluster(final int numMasters, |
| final int numSlaves, final Class<? extends HRegionServer> regionServerClass) throws IOException, InterruptedException { |
| LOG.info("Starting up minicluster"); |
| // If we already put up a cluster, fail. |
| if (miniClusterRunning) { |
| throw new IllegalStateException("A mini-cluster is already running"); |
| } |
| miniClusterRunning = true; |
| |
| // Make a new random dir to home everything in. Set it as system property. |
| // minidfs reads home from system property. |
| setupClusterTestBuildDir(); |
| System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath()); |
| |
| // Initialize the file-system. |
| FileSystem fs = null; |
| Path hbaseRootdir = null; |
| |
| String fsType = this.conf.get(FS_TYPE_KEY, |
| MINICLUSTER_FS_TYPE_DEFAULT); |
| switch (fsType) { |
| case FS_TYPE_DFS: |
| // Bring up mini dfs cluster. This spawns a bunch of warnings about |
| // missing scheme. Complaints are 'Scheme is undefined for |
| // build/test/data/dfs/name1'. |
| startMiniDFSCluster(numSlaves); |
| // Mangle conf so fs parameter points to minidfs we just started up |
| fs = this.dfsCluster.getFileSystem(); |
| this.conf.set("fs.defaultFS", fs.getUri().toString()); |
| // Do old style too just to be safe. |
| this.conf.set("fs.default.name", fs.getUri().toString()); |
| for (DataNode dn : dfsCluster.getDataNodes()) { |
| this.dfsCluster.waitDataNodeInitialized(dn); |
| } |
| hbaseRootdir = fs.getHomeDirectory(); |
| break; |
| |
| case FS_TYPE_LFS: |
| this.conf.set("fs.defaultFS", "file:///"); |
| // Do old style too just to be safe. |
| this.conf.set("fs.default.name", "file:///"); |
| fs = FileSystem.get(this.conf); |
| hbaseRootdir = new Path("file", "", |
| new File(this.clusterTestBuildDir, "fs").getAbsolutePath()); |
| break; |
| |
| default: |
| throw new IllegalArgumentException("conf[" + FS_TYPE_KEY + "] = " |
| + fsType); |
| } |
| |
| // Start up a zk cluster. |
| if (this.zkCluster == null) { |
| startMiniZKCluster(this.clusterTestBuildDir); |
| } |
| |
| // Now do the mini hbase cluster. Set the hbase.rootdir in config. |
| this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString()); |
| fs.mkdirs(hbaseRootdir); |
| FSUtils.setVersion(fs, hbaseRootdir); |
| startMiniHBaseCluster(numMasters, numSlaves, regionServerClass); |
| |
| // Don't leave here till we've done a successful scan of the .META. |
| HTable t = null; |
| for (int i = 0; i < 10; ++i) { |
| try { |
| t = new HTable(this.conf, HConstants.META_TABLE_NAME); |
| for (Result result : t.getScanner(new Scan())) { |
| LOG.debug("Successfully read meta entry: " + result); |
| } |
| break; |
| } catch (NoServerForRegionException ex) { |
| LOG.error("META is not online, sleeping"); |
| Threads.sleepWithoutInterrupt(2000); |
| } |
| } |
| if (t == null) { |
| throw new IOException("Could not open META on cluster startup"); |
| } |
| |
| ResultScanner s = t.getScanner(new Scan()); |
| while (s.next() != null) continue; |
| LOG.info("Minicluster is up"); |
| t.close(); |
| return this.hbaseCluster; |
| } |
| |
| |
| public void startMiniHBaseCluster(final int numMasters, final int numSlaves) |
| throws IOException, InterruptedException { |
| startMiniHBaseCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class); |
| } |
| |
| public void startMiniHBaseCluster(final int numMasters, final int numSlaves, |
| Class<? extends HRegionServer> regionServerClass) |
| throws IOException, InterruptedException { |
| this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves, regionServerClass); |
| } |
| |
| /** |
| * @return Current mini hbase cluster. Only has something in it after a call |
| * to {@link #startMiniCluster()}. |
| * @see #startMiniCluster() |
| */ |
| public MiniHBaseCluster getMiniHBaseCluster() { |
| return this.hbaseCluster; |
| } |
| |
| /** |
| * @throws IOException |
| * @see {@link #startMiniCluster(int)} |
| */ |
| public void shutdownMiniCluster() throws IOException { |
| LOG.info("Shutting down minicluster"); |
| shutdownMiniHBaseCluster(); |
| shutdownMiniZKCluster(); |
| shutdownMiniDFSCluster(); |
| |
| cleanupTestDir(); |
| HConnectionManager.deleteAllConnections(); |
| miniClusterRunning = false; |
| LOG.info("Minicluster is down"); |
| } |
| |
| /** |
| * Shutdown HBase mini cluster. Does not shutdown zk or dfs if running. |
| * @throws IOException |
| */ |
| public void shutdownMiniHBaseCluster() throws IOException { |
| if (this.hbaseCluster != null) { |
| this.hbaseCluster.shutdown(); |
| // Wait till hbase is down before going on to shutdown zk. |
| this.hbaseCluster.join(); |
| this.hbaseCluster = null; |
| // hbaseAdmin holds some connection that have been closed during shutting |
| // down HBase cluster. |
| this.hbaseAdmin = null; |
| } |
| } |
| |
| /** |
| * Flushes all caches in the mini hbase cluster |
| * @throws IOException |
| */ |
| public void flush() throws IOException { |
| this.hbaseCluster.flushcache(); |
| } |
| |
| /** |
| * Flushes all caches in the mini hbase cluster |
| * @throws IOException |
| */ |
| public void flush(byte [] tableName) throws IOException { |
| this.hbaseCluster.flushcache(tableName); |
| } |
| |
| |
| /** |
| * Create a table. |
| * @param tableName |
| * @param family |
| * @return An HTableAsync instance for the created table, which is a sub-class |
| * of the HTable class, and HTableAsyncInterface. So you can use both |
| * the sync API of HTableInterface, and async API in |
| * HTableAsyncInterface. |
| * @throws IOException |
| */ |
| public HTableAsync createTable(byte[] tableName, byte[] family) |
| throws IOException{ |
| return createTable(tableName, new byte[][]{family}); |
| } |
| |
| /** |
| * Create a table. |
| * |
| * @param tableName |
| * @param family |
| * @return An HTableAsync instance for the created table, which is a sub-class |
| * of the HTable class, and HTableAsyncInterface. So you can use both |
| * the sync API of HTableInterface, and async API in |
| * HTableAsyncInterface. |
| * @throws IOException |
| */ |
| public HTableAsync createTable(StringBytes tableName, byte[] family) |
| throws IOException { |
| return createTable(tableName.getBytes(), new byte[][] { family }); |
| } |
| |
| public HTable createTable(byte[] tableName, byte[][] families, |
| int numVersions, byte[] startKey, byte[] endKey, int numRegions) |
| throws IOException { |
| return createTable(new StringBytes(tableName), families, numVersions, |
| startKey, endKey, numRegions); |
| } |
| |
| /** |
| * Creates a table. |
| * |
| * @param startKey the startKey of the second region. |
| * @param endKey the endKey of the last second region. |
| * @param numRegions if numRegions equals 1, startKey and endKey are not used, |
| * if numRegions equals 2, endKey is not used. |
| */ |
| public HTableAsync createTable(StringBytes tableName, byte[][] families, |
| int numVersions, byte[] startKey, byte[] endKey, int numRegions) |
| throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName.getBytes()); |
| for (byte[] family : families) { |
| HColumnDescriptor hcd = |
| new HColumnDescriptor(family).setMaxVersions(numVersions); |
| desc.addFamily(hcd); |
| } |
| if (numRegions == 1) { |
| new HBaseAdmin(getConfiguration()).createTable(desc); |
| } if (numRegions == 2) { |
| new HBaseAdmin(getConfiguration()).createTable(desc, |
| new byte[][] { startKey }); |
| } else { |
| new HBaseAdmin(getConfiguration()).createTable(desc, startKey, endKey, |
| numRegions); |
| } |
| return new HTableAsync(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Creates a table. |
| * |
| * @param numRegions if numRegions equals 1, startKey and endKey are not used, |
| * if numRegions equals 2, endKey is not used. |
| */ |
| public HTable createTable(StringBytes tableName, byte[][] families, |
| int numVersions, byte[][] splitKeys) throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName.getBytes()); |
| for (byte[] family : families) { |
| HColumnDescriptor hcd = |
| new HColumnDescriptor(family).setMaxVersions(numVersions); |
| desc.addFamily(hcd); |
| } |
| new HBaseAdmin(getConfiguration()).createTable(desc, splitKeys); |
| |
| return new HTable(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Create a table. |
| * |
| * @param tableName |
| * @param families |
| * @return An HTableAsync instance for the created table, which is a sub-class |
| * of the HTable class, and HTableAsyncInterface. So you can use both |
| * the sync API of HTableInterface, and async API in |
| * HTableAsyncInterface. |
| * @throws IOException |
| */ |
| public HTableAsync createTable(byte[] tableName, byte[][] families) |
| throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| for(byte[] family : families) { |
| desc.addFamily(new HColumnDescriptor(family)); |
| } |
| (new HBaseAdmin(getConfiguration())).createTable(desc); |
| return new HTableAsync(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Create a table |
| * |
| * @param tableName |
| * @param columnDescriptors |
| * @return An HTable instance for the created table. |
| * @throws IOException |
| */ |
| public HTable createTable(byte[] tableName, |
| HColumnDescriptor[] columnDescriptors) throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| for (HColumnDescriptor columnDescriptor : columnDescriptors) { |
| desc.addFamily(columnDescriptor); |
| } |
| HBaseAdmin admin = new HBaseAdmin(getConfiguration()); |
| try { |
| admin.createTable(desc); |
| return new HTable(getConfiguration(), tableName); |
| } finally { |
| admin.close(); |
| } |
| } |
| |
| /** |
| * Create a table. |
| * @param tableName |
| * @param family |
| * @param numVersions |
| * @return An HTable instance for the created table. |
| * @throws IOException |
| */ |
| public HTable createTable(byte[] tableName, byte[] family, int numVersions) |
| throws IOException { |
| return createTable(tableName, new byte[][]{family}, numVersions); |
| } |
| |
| /** |
| * Create a table. |
| * @param tableName |
| * @param families |
| * @param numVersions |
| * @return An HTable instance for the created table. |
| * @throws IOException |
| */ |
| public HTable createTable(byte[] tableName, byte[][] families, |
| int numVersions) |
| throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| for (byte[] family : families) { |
| HColumnDescriptor hcd = new HColumnDescriptor(family) |
| .setMaxVersions(numVersions); |
| desc.addFamily(hcd); |
| } |
| getHBaseAdmin().createTable(desc); |
| return new HTable(new Configuration(getConfiguration()), tableName); |
| } |
| |
| /** |
| * Create a table. |
| * @param tableName |
| * @param families |
| * @param numVersions |
| * @return An HTable instance for the created table. |
| * @throws IOException |
| */ |
| public HTable createTable(byte[] tableName, byte[][] families, |
| int numVersions, int blockSize) throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| for (byte[] family : families) { |
| HColumnDescriptor hcd = new HColumnDescriptor(family) |
| .setMaxVersions(numVersions) |
| .setBlocksize(blockSize); |
| desc.addFamily(hcd); |
| } |
| (new HBaseAdmin(getConfiguration())).createTable(desc); |
| return new HTable(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Create a table. |
| * @param tableName |
| * @param families |
| * @param numVersions |
| * @return An HTable instance for the created table. |
| * @throws IOException |
| */ |
| public HTable createTable(byte[] tableName, byte[][] families, |
| int[] numVersions) |
| throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| int i = 0; |
| for (byte[] family : families) { |
| HColumnDescriptor hcd = new HColumnDescriptor(family) |
| .setMaxVersions(numVersions[i]); |
| desc.addFamily(hcd); |
| i++; |
| } |
| (new HBaseAdmin(getConfiguration())).createTable(desc); |
| return new HTable(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Returns an instance of HTable of specified table. |
| */ |
| public HTable getHTable(StringBytes tableName) throws IOException { |
| return new HTable(getConfiguration(), tableName); |
| } |
| |
| /** |
| * Deletes a table. |
| */ |
| public void deleteTable(byte[] tableName) throws IOException { |
| HBaseAdmin hba = new HBaseAdmin(getConfiguration()); |
| hba.disableTable(tableName); |
| hba.deleteTable(tableName); |
| } |
| |
| /** |
| * Deletes a table. |
| */ |
| public void deleteTable(StringBytes tableName) throws IOException { |
| deleteTable(tableName.getBytes()); |
| } |
| |
| /** |
| * Provide an existing table name to truncate |
| * @param tableName existing table |
| * @return HTable to that new table |
| * @throws IOException |
| */ |
| public HTable truncateTable(byte [] tableName) throws IOException { |
| HTable table = new HTable(getConfiguration(), tableName); |
| Scan scan = new Scan(); |
| ResultScanner resScan = table.getScanner(scan); |
| for(Result res : resScan) { |
| Delete del = new Delete(res.getRow()); |
| table.delete(del); |
| } |
| return table; |
| } |
| |
| /** |
| * Load table with rows from 'aaa' to 'zzz'. |
| * @param t Table |
| * @param f Family |
| * @return Count of rows loaded. |
| * @throws IOException |
| */ |
| public int loadTable(final HTable t, final byte[] f) throws IOException { |
| t.setAutoFlush(false); |
| byte[] k = new byte[3]; |
| int rowCount = 0; |
| for (byte b1 = 'a'; b1 <= 'z'; b1++) { |
| for (byte b2 = 'a'; b2 <= 'z'; b2++) { |
| for (byte b3 = 'a'; b3 <= 'z'; b3++) { |
| k[0] = b1; |
| k[1] = b2; |
| k[2] = b3; |
| Put put = new Put(k); |
| put.add(f, null, k); |
| t.put(put); |
| rowCount++; |
| } |
| } |
| } |
| t.flushCommits(); |
| return rowCount; |
| } |
| |
| public int loadTable2(final HTable t, final byte[] f) throws IOException { |
| t.setAutoFlush(false); |
| byte[] k = new byte[4]; |
| int rowCount = 0; |
| for (byte b0 = 'a'; b0 <= 'z'; b0++) { |
| for (byte b1 = 'a'; b1 <= 'z'; b1++) { |
| for (byte b2 = 'a'; b2 <= 'z'; b2++) { |
| for (byte b3 = 'a'; b3 <= 'z'; b3++) { |
| k[0] = b0; |
| k[1] = b1; |
| k[2] = b2; |
| k[3] = b3; |
| Put put = new Put(k); |
| put.add(f, null, k); |
| t.put(put); |
| rowCount++; |
| if (rowCount % 6000 == 0) { |
| t.flushCommits(); |
| this.flush(t.getTableName()); |
| } |
| } |
| } |
| } |
| } |
| t.flushCommits(); |
| return rowCount; |
| } |
| |
| /** |
| * Return the number of rows in the given table. |
| */ |
| public int countRows(final HTable table) throws IOException { |
| Scan scan = new Scan(); |
| ResultScanner results = table.getScanner(scan); |
| int count = 0; |
| for (@SuppressWarnings("unused") Result res : results) { |
| count++; |
| } |
| results.close(); |
| return count; |
| } |
| |
| /** |
| * Return an md5 digest of the entire contents of a table. |
| */ |
| public String checksumRows(final HTable table) throws Exception { |
| Scan scan = new Scan(); |
| ResultScanner results = table.getScanner(scan); |
| MessageDigest digest = MessageDigest.getInstance("MD5"); |
| for (Result res : results) { |
| digest.update(res.getRow()); |
| } |
| results.close(); |
| return digest.toString(); |
| } |
| |
| /** |
| * @deprecated Use createTable with startKey/stopKey versions to create |
| */ |
| @Deprecated |
| public int createMultiRegions(HTable table, byte[] columnFamily) |
| throws IOException { |
| return createMultiRegions(getConfiguration(), table, columnFamily); |
| } |
| |
| /** |
| * @deprecated Use createTable with startKey/stopKey versions to create |
| */ |
| @Deprecated |
| public int createMultiRegions(final Configuration c, final HTable table, |
| final byte[] columnFamily) throws IOException { |
| return createMultiRegions(c, table, columnFamily, getTmpKeys()); |
| } |
| |
| public byte[][] getTmpKeys() { |
| byte[][] KEYS = { |
| Bytes.toBytes("aaa"), Bytes.toBytes("bbb"), |
| Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"), |
| Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"), |
| Bytes.toBytes("iii"), Bytes.toBytes("jjj"), Bytes.toBytes("kkk"), |
| Bytes.toBytes("lll"), Bytes.toBytes("mmm"), Bytes.toBytes("nnn"), |
| Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"), |
| Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), |
| Bytes.toBytes("uuu"), Bytes.toBytes("vvv"), Bytes.toBytes("www"), |
| Bytes.toBytes("xxx"), Bytes.toBytes("yyy") |
| }; |
| return KEYS; |
| } |
| |
| /** |
| * @deprecated Use createTable with startKey/stopKey versions to create |
| */ |
| @Deprecated |
| public int createMultiRegions(final Configuration c, final HTable table, |
| final byte[] columnFamily, byte [][] startKeys) |
| throws IOException { |
| return createMultiRegionsWithFavoredNodes(c,table,columnFamily, |
| new Pair<byte[][], byte[][]>(startKeys, null)); |
| } |
| |
| public int createMultiRegionsWithFavoredNodes(final Configuration c, final HTable table, |
| final byte[] columnFamily, Pair<byte[][], byte[][]> startKeysAndFavNodes) |
| throws IOException { |
| byte[][] startKeys = startKeysAndFavNodes.getFirst(); |
| byte[][] favNodes = startKeysAndFavNodes.getSecond(); |
| |
| HBaseAdmin admin = new HBaseAdmin(conf); |
| |
| // Disable the table before modifying the META table |
| admin.disableTable(table.getTableName()); |
| |
| Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR); |
| try(HTable meta = new HTable(c, HConstants.META_TABLE_NAME)) { |
| HTableDescriptor htd = table.getTableDescriptor(); |
| if(!htd.hasFamily(columnFamily)) { |
| HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); |
| htd.addFamily(hcd); |
| } |
| // remove empty region - this is tricky as the mini cluster during the test |
| // setup already has the "<tablename>,,123456789" row with an empty start |
| // and end key. Adding the custom regions below adds those blindly, |
| // including the new start region from empty to "bbb". lg |
| List<byte[]> rows = getMetaTableRows(htd.getName()); |
| // add custom ones |
| int count = 0; |
| for (int i = 0; i < startKeys.length; i++) { |
| int j = (i + 1) % startKeys.length; |
| HRegionInfo hri = new HRegionInfo(table.getTableDescriptor(), |
| startKeys[i], startKeys[j]); |
| Put put = new Put(hri.getRegionName()); |
| put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, |
| Writables.getBytes(hri)); |
| if (favNodes != null) { |
| put.add(HConstants.CATALOG_FAMILY, HConstants.FAVOREDNODES_QUALIFIER, |
| favNodes[i]); |
| } |
| meta.put(put); |
| LOG.info("createMultiRegions: inserted " + hri.toString()); |
| count++; |
| } |
| // see comment above, remove "old" (or previous) single region |
| for (byte[] row : rows) { |
| LOG.info("createMultiRegions: deleting meta row -> " |
| + Bytes.toStringBinary(row)); |
| meta.delete(new Delete(row)); |
| } |
| |
| admin.enableTable(table.getTableName()); |
| |
| // flush cache of regions |
| HConnection conn = table.getConnectionAndResetOperationContext(); |
| conn.clearRegionCache(); |
| return count; |
| } |
| } |
| |
| /** |
| * Returns all rows from the .META. table. |
| * |
| * @throws IOException When reading the rows fails. |
| */ |
| public List<byte[]> getMetaTableRows() throws IOException { |
| try (HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME)) { |
| List<byte[]> rows = new ArrayList<byte[]>(); |
| ResultScanner s = t.getScanner(new Scan()); |
| for (Result result : s) { |
| LOG.info("getMetaTableRows: row -> " |
| + Bytes.toStringBinary(result.getRow())); |
| rows.add(result.getRow()); |
| } |
| s.close(); |
| return rows; |
| } |
| } |
| |
| /** |
| * Returns all rows from the .META. table for a given user table |
| * |
| * @throws IOException When reading the rows fails. |
| */ |
| public List<byte[]> getMetaTableRows(byte[] tableName) throws IOException { |
| try (HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME)) { |
| List<byte[]> rows = new ArrayList<byte[]>(); |
| ResultScanner s = t.getScanner(new Scan()); |
| for (Result result : s) { |
| HRegionInfo info = Writables.getHRegionInfo( |
| result.getValue(HConstants.CATALOG_FAMILY, |
| HConstants.REGIONINFO_QUALIFIER)); |
| HTableDescriptor desc = info.getTableDesc(); |
| if (Bytes.compareTo(desc.getName(), tableName) == 0) { |
| LOG.info("getMetaTableRows: row -> " |
| + Bytes.toStringBinary(result.getRow())); |
| rows.add(result.getRow()); |
| } |
| } |
| s.close(); |
| return rows; |
| } |
| } |
| |
| /** |
| * Tool to get the reference to the region server object that holds the |
| * region of the specified user table. |
| * It first searches for the meta rows that contain the region of the |
| * specified table, then gets the index of that RS, and finally retrieves |
| * the RS's reference. |
| * |
| * @param tableName user table to lookup in .META. |
| * @return region server that holds it, null if the row doesn't exist |
| * @throws IOException |
| */ |
| public HRegionServer getRSForFirstRegionInTable(byte[] tableName) |
| throws IOException { |
| List<byte[]> metaRows = getMetaTableRows(tableName); |
| if (metaRows == null || metaRows.isEmpty()) { |
| return null; |
| } |
| LOG.debug("Found " + metaRows.size() + " rows for table " |
| + Bytes.toString(tableName)); |
| byte[] firstrow = metaRows.get(0); |
| LOG.debug("FirstRow=" + Bytes.toString(firstrow)); |
| int index = hbaseCluster.getServerWith(firstrow); |
| if (index < 0) { |
| return null; |
| } |
| return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); |
| } |
| |
| /** |
| * Get the RegionServer which hosts a region with the given region name. |
| * @param regionName |
| * @return |
| */ |
| public HRegionServer getRSWithRegion(byte[] regionName) { |
| int index = hbaseCluster.getServerWith(regionName); |
| if (index == -1) { |
| return null; |
| } |
| return hbaseCluster.getRegionServerThreads().get(index).getRegionServer(); |
| } |
| |
| /** |
| * Starts a <code>MiniMRCluster</code> with a default number of |
| * <code>TaskTracker</code>'s. |
| * |
| * @throws IOException When starting the cluster fails. |
| */ |
| public MiniMRCluster startMiniMapReduceCluster() throws IOException { |
| startMiniMapReduceCluster(2); |
| return mrCluster; |
| } |
| |
| /** |
| * Tasktracker has a bug where changing the hadoop.log.dir system property |
| * will not change its internal static LOG_DIR variable. |
| */ |
| private void forceChangeTaskLogDir() { |
| Field logDirField; |
| try { |
| logDirField = TaskLog.class.getDeclaredField("LOG_DIR"); |
| logDirField.setAccessible(true); |
| |
| Field modifiersField = Field.class.getDeclaredField("modifiers"); |
| modifiersField.setAccessible(true); |
| modifiersField.setInt(logDirField, logDirField.getModifiers() & ~Modifier.FINAL); |
| |
| logDirField.set(null, new File(hadoopLogDir, "userlogs")); |
| } catch (SecurityException e) { |
| throw new RuntimeException(e); |
| } catch (NoSuchFieldException e) { |
| // TODO Auto-generated catch block |
| throw new RuntimeException(e); |
| } catch (IllegalArgumentException e) { |
| throw new RuntimeException(e); |
| } catch (IllegalAccessException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** |
| * Starts a <code>MiniMRCluster</code>. Call {@link #setFileSystemURI(String)} to use a different |
| * filesystem. |
| * |
| * @param servers The number of <code>TaskTracker</code>'s to start. |
| * @throws IOException When starting the cluster fails. |
| */ |
| private void startMiniMapReduceCluster(final int servers) throws IOException { |
| if (mrCluster != null) { |
| throw new IllegalStateException("MiniMRCluster is already running"); |
| } |
| LOG.info("Starting mini mapreduce cluster..."); |
| // These are needed for the new and improved Map/Reduce framework |
| Configuration c = getConfiguration(); |
| |
| setupClusterTestBuildDir(); |
| createDirsAndSetProperties(); |
| |
| forceChangeTaskLogDir(); |
| |
| // Allow the user to override FS URI for this map-reduce cluster to use. |
| mrCluster = new MiniMRCluster(servers, |
| fsURI != null ? fsURI : FileSystem.get(c).getUri().toString(), 1); |
| |
| LOG.info("Mini mapreduce cluster started"); |
| c.set("mapred.job.tracker", |
| mrCluster.createJobConf().get("mapred.job.tracker")); |
| } |
| |
| private String createDirAndSetProperty(final String relPath, String property) { |
| String path = clusterTestBuildDir.getPath() + "/" + relPath; |
| System.setProperty(property, path); |
| conf.set(property, path); |
| new File(path).mkdirs(); |
| LOG.info("Setting " + property + " to " + path + " in system properties and HBase conf"); |
| return path; |
| } |
| |
| /** |
| * Stops the previously started <code>MiniMRCluster</code>. |
| */ |
| public void shutdownMiniMapReduceCluster() { |
| LOG.info("Stopping mini mapreduce cluster..."); |
| if (mrCluster != null) { |
| mrCluster.shutdown(); |
| mrCluster = null; |
| } |
| // Restore configuration to point to local jobtracker |
| conf.set("mapred.job.tracker", "local"); |
| LOG.info("Mini mapreduce cluster stopped"); |
| } |
| |
| /** |
| * Switches the logger for the given class to DEBUG level. |
| * |
| * @param clazz The class for which to switch to debug logging. |
| */ |
| public void enableDebug(Class<?> clazz) { |
| Log l = LogFactory.getLog(clazz); |
| if (l instanceof Log4JLogger) { |
| ((Log4JLogger) l).getLogger().setLevel(org.apache.log4j.Level.DEBUG); |
| } else if (l instanceof Jdk14Logger) { |
| ((Jdk14Logger) l).getLogger().setLevel(java.util.logging.Level.ALL); |
| } |
| } |
| |
| /** |
| * Expire the Master's session |
| * @throws Exception |
| */ |
| public void expireMasterSession() throws Exception { |
| HMaster master = hbaseCluster.getMaster(); |
| expireSession(master.getZooKeeperWrapper()); |
| } |
| |
| /** |
| * Expire a region server's session |
| * @param index which RS |
| * @throws Exception |
| */ |
| public void expireRegionServerSession(int index) throws Exception { |
| HRegionServer rs = hbaseCluster.getRegionServer(index); |
| expireSession(rs.getZooKeeperWrapper()); |
| } |
| |
| public void expireSession(ZooKeeperWrapper nodeZK) throws Exception{ |
| nodeZK.registerListener(EmptyWatcher.instance); |
| String quorumServers = nodeZK.getQuorumServers(); |
| int sessionTimeout = nodeZK.getSessionTimeout(); |
| byte[] password = nodeZK.getSessionPassword(); |
| long sessionID = nodeZK.getSessionID(); |
| final int sleep = 1000; |
| final int maxRetryNum = 60 * 1000 / sleep; // Wait for a total of up to one minute. |
| int retryNum = maxRetryNum; |
| |
| ZooKeeper zk = new ZooKeeper(quorumServers, |
| sessionTimeout, EmptyWatcher.instance, sessionID, password); |
| LOG.debug("Closing ZK"); |
| zk.close(); |
| |
| while (!nodeZK.isAborted() && retryNum != 0) { |
| Thread.sleep(sleep); |
| retryNum--; |
| } |
| if (retryNum == 0) { |
| fail("ZooKeeper is not aborted after " + maxRetryNum + " attempts."); |
| } |
| LOG.debug("ZooKeeper is closed"); |
| } |
| |
| /** |
| * Get the HBase cluster. |
| * |
| * @return hbase cluster |
| */ |
| public MiniHBaseCluster getHBaseCluster() { |
| return hbaseCluster; |
| } |
| |
| /** |
| * Returns a HBaseAdmin instance. |
| * |
| * @return The HBaseAdmin instance. |
| * @throws MasterNotRunningException |
| */ |
| public HBaseAdmin getHBaseAdmin() throws MasterNotRunningException { |
| if (hbaseAdmin == null) { |
| hbaseAdmin = new HBaseAdmin(getConfiguration()); |
| } |
| return hbaseAdmin; |
| } |
| |
| /** |
| * Closes the named region. |
| * |
| * @param regionName The region to close. |
| * @throws IOException |
| */ |
| public void closeRegion(String regionName) throws IOException { |
| closeRegion(Bytes.toBytes(regionName)); |
| } |
| |
| /** |
| * Closes the named region. |
| * |
| * @param regionName The region to close. |
| * @throws IOException |
| */ |
| public void closeRegion(byte[] regionName) throws IOException { |
| HBaseAdmin admin = getHBaseAdmin(); |
| admin.closeRegion(regionName, (Object[]) null); |
| } |
| |
| /** |
| * Closes the region containing the given row. |
| * |
| * @param row The row to find the containing region. |
| * @param table The table to find the region. |
| * @throws IOException |
| */ |
| public void closeRegionByRow(String row, HTable table) throws IOException { |
| closeRegionByRow(Bytes.toBytes(row), table); |
| } |
| |
| /** |
| * Closes the region containing the given row. |
| * |
| * @param row The row to find the containing region. |
| * @param table The table to find the region. |
| * @throws IOException |
| */ |
| public void closeRegionByRow(byte[] row, HTable table) throws IOException { |
| HRegionLocation hrl = table.getRegionLocation(row); |
| closeRegion(hrl.getRegionInfo().getRegionName()); |
| } |
| |
| public MiniZooKeeperCluster getZkCluster() { |
| return zkCluster; |
| } |
| |
| public void setZkCluster(MiniZooKeeperCluster zkCluster) { |
| this.zkCluster = zkCluster; |
| conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkCluster.getClientPort()); |
| } |
| |
| public MiniDFSCluster getDFSCluster() { |
| return dfsCluster; |
| } |
| |
| public FileSystem getTestFileSystem() throws IOException { |
| return FileSystem.get(conf); |
| } |
| |
| public void waitTableAvailable(byte[] table, long timeoutMillis) |
| throws InterruptedException, IOException { |
| HBaseAdmin admin = new HBaseAdmin(conf); |
| long startWait = System.currentTimeMillis(); |
| while (!admin.isTableAvailable(table)) { |
| assertTrue("Timed out waiting for table " + Bytes.toStringBinary(table), |
| System.currentTimeMillis() - startWait < timeoutMillis); |
| Thread.sleep(500); |
| } |
| } |
| |
| /** |
| * Make sure that at least the specified number of region servers |
| * are running |
| * @param num minimum number of region servers that should be running |
| * @throws IOException |
| */ |
| public void ensureSomeRegionServersAvailable(final int num) |
| throws IOException { |
| if (this.getHBaseCluster().getLiveRegionServerThreads().size() < num) { |
| // Need at least "num" servers. |
| LOG.info("Started new server=" + |
| this.getHBaseCluster().startRegionServer()); |
| |
| } |
| } |
| |
| /** |
| * This method clones the passed <code>c</code> configuration setting a new |
| * user into the clone. Use it getting new instances of FileSystem. Only |
| * works for DistributedFileSystem. |
| * @param c Initial configuration |
| * @param differentiatingSuffix Suffix to differentiate this user from others. |
| * @return A new configuration instance with a different user set into it. |
| * @throws IOException |
| */ |
| public static Configuration setDifferentUser(final Configuration c) |
| throws IOException { |
| FileSystem currentfs = FileSystem.get(c); |
| Preconditions.checkArgument(currentfs instanceof DistributedFileSystem); |
| // Else distributed filesystem. Make a new instance per daemon. Below |
| // code is taken from the AppendTestUtil over in hdfs. |
| Configuration c2 = new Configuration(c); |
| String username = UserGroupInformation.getCurrentUGI().getUserName() + (USERNAME_SUFFIX++); |
| UnixUserGroupInformation.saveToConf(c2, |
| UnixUserGroupInformation.UGI_PROPERTY_NAME, |
| new UnixUserGroupInformation(username, new String[]{"supergroup"})); |
| return c2; |
| } |
| |
| /** |
| * Set soft and hard limits in namenode. |
| * You'll get a NPE if you call before you've started a minidfscluster. |
| * @param soft Soft limit |
| * @param hard Hard limit |
| * @throws NoSuchFieldException |
| * @throws SecurityException |
| * @throws IllegalAccessException |
| * @throws IllegalArgumentException |
| */ |
| public void setNameNodeNameSystemLeasePeriod(final int soft, final int hard) |
| throws SecurityException, NoSuchFieldException, IllegalArgumentException, IllegalAccessException { |
| // TODO: If 0.20 hadoop do one thing, if 0.21 hadoop do another. |
| // Not available in 0.20 hdfs. Use reflection to make it happen. |
| |
| // private NameNode nameNode; |
| NameNode nn = null; |
| try { |
| Field field = this.dfsCluster.getClass().getDeclaredField("nameNode"); |
| field.setAccessible(true); |
| nn = (NameNode)field.get(this.dfsCluster); |
| } catch (NoSuchFieldException ne) { |
| // The latest version of HDFS has a nice API for this. |
| nn = dfsCluster.getNameNode(); |
| } |
| nn.namesystem.leaseManager.setLeasePeriod(100, 50000); |
| } |
| |
| /** |
| * Set maxRecoveryErrorCount in DFSClient. In 0.20 pre-append its hard-coded to 5 and |
| * makes tests linger. Here is the exception you'll see: |
| * <pre> |
| * 2010-06-15 11:52:28,511 WARN [DataStreamer for file /hbase/.logs/hlog.1276627923013 block blk_928005470262850423_1021] hdfs.DFSClient$DFSOutputStream(2657): Error Recovery for block blk_928005470262850423_1021 failed because recovery from primary datanode 127.0.0.1:53683 failed 4 times. Pipeline was 127.0.0.1:53687, 127.0.0.1:53683. Will retry... |
| * </pre> |
| * @param stream A DFSClient.DFSOutputStream. |
| * @param max |
| * @throws NoSuchFieldException |
| * @throws SecurityException |
| * @throws IllegalAccessException |
| * @throws IllegalArgumentException |
| */ |
| public static void setMaxRecoveryErrorCount(final OutputStream stream, |
| final int max) { |
| try { |
| Class<?> [] clazzes = DFSClient.class.getDeclaredClasses(); |
| for (Class<?> clazz: clazzes) { |
| String className = clazz.getSimpleName(); |
| if (className.equals("DFSOutputStream")) { |
| if (clazz.isInstance(stream)) { |
| Field maxRecoveryErrorCountField = |
| stream.getClass().getDeclaredField("maxRecoveryErrorCount"); |
| maxRecoveryErrorCountField.setAccessible(true); |
| maxRecoveryErrorCountField.setInt(stream, max); |
| break; |
| } |
| } |
| } |
| } catch (Exception e) { |
| LOG.info("Could not set max recovery field", e); |
| } |
| } |
| |
| /** |
| * |
| * @param expectedUserRegions The number of regions which are not META or ROOT |
| */ |
| public void waitForOnlineRegionsToBeAssigned(int expectedUserRegions) { |
| int actualRegions; |
| |
| while (true) { |
| actualRegions = 0; |
| |
| List<HRegionServer> onlineServers = getOnlineRegionServers(); |
| LOG.info("Online servers: " + onlineServers); |
| for (HRegionServer server : onlineServers) { |
| for (HRegion region : server.getOnlineRegions()) { |
| // Ignore META or ROOT regions. |
| if (region.getRegionInfo().isMetaRegion() |
| || region.getRegionInfo().isRootRegion()) { |
| continue; |
| } |
| |
| LOG.info("Region " + region + " at " + server); |
| actualRegions++; |
| } |
| } |
| if (actualRegions == expectedUserRegions) { |
| break; |
| } |
| |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| // Ignore |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| LOG.info(expectedUserRegions + " regions asssigned"); |
| } |
| |
| /** |
| * Waits until the META table has same contents as the region servers. |
| */ |
| public void waitForTableConsistent() throws IOException { |
| final AtomicBoolean allInFavor = new AtomicBoolean(true); |
| final Map<String, Map<String, HServerAddress>> fromMeta = new HashMap<>(); |
| while (true) { |
| // Location info from META table |
| fromMeta.clear(); |
| allInFavor.set(true); |
| MetaScannerVisitor visitor = new MetaScannerVisitor() { |
| @Override |
| public boolean processRow(Result rowResult) throws IOException { |
| HRegionInfo info = |
| Writables.getHRegionInfo(rowResult.getValue( |
| HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER)); |
| |
| String tableName = info.getTableDesc().getNameAsString(); |
| |
| HServerAddress server = new HServerAddress(); |
| byte[] value = |
| rowResult.getValue(HConstants.CATALOG_FAMILY, |
| HConstants.SERVER_QUALIFIER); |
| if (value != null && value.length > 0) { |
| String address = Bytes.toString(value); |
| server = new HServerAddress(address); |
| } |
| |
| if (!(info.isOffline() || info.isSplit())) { |
| Map<String, HServerAddress> regionToAddr = |
| fromMeta.get(tableName); |
| if (regionToAddr == null) { |
| regionToAddr = new HashMap<>(); |
| fromMeta.put(tableName, regionToAddr); |
| } |
| regionToAddr.put(info.getRegionNameAsString(), server); |
| |
| byte[] fnBytes = |
| rowResult.getValue(HConstants.CATALOG_FAMILY, |
| HConstants.FAVOREDNODES_QUALIFIER); |
| if (fnBytes != null && fnBytes.length > 0) { |
| String fnStr = Bytes.toString(fnBytes); |
| List<HServerAddress> fnServers = |
| RegionPlacement.getFavoredNodeList(fnStr); |
| if (fnServers.size() > 0) { |
| if (!server.equals(fnServers.get(0))) { |
| allInFavor.set(false); |
| return false; |
| } |
| } |
| } |
| } |
| return true; |
| } |
| }; |
| MetaScanner.metaScan(conf, visitor, (StringBytes) null); |
| if (allInFavor.get()) { |
| // Location info from region servers |
| final Map<String, Map<String, HServerAddress>> fromRS = new HashMap<>(); |
| List<HRegionServer> onlineServers = getOnlineRegionServers(); |
| for (HRegionServer rs : onlineServers) { |
| for (HRegion region : rs.getOnlineRegions()) { |
| // Ignore META or ROOT regions. |
| if (region.getRegionInfo().isMetaRegion() |
| || region.getRegionInfo().isRootRegion()) { |
| continue; |
| } |
| |
| HRegionInfo info = region.getRegionInfo(); |
| String tableName = info.getTableDesc().getNameAsString(); |
| Map<String, HServerAddress> regionToAddr = fromRS.get(tableName); |
| if (regionToAddr == null) { |
| regionToAddr = new HashMap<>(); |
| fromRS.put(tableName, regionToAddr); |
| } |
| regionToAddr.put(info.getRegionNameAsString(), rs.getServerInfo() |
| .getServerAddress()); |
| } |
| } |
| |
| if (fromMeta.equals(fromRS)) { |
| // Consistent now, quit |
| break; |
| } |
| } |
| |
| // Sleep for a while before next check |
| try { |
| Thread.sleep(1000); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| public List<HRegionServer> getOnlineRegionServers() { |
| List<HRegionServer> list = new ArrayList<HRegionServer>(); |
| for (JVMClusterUtil.RegionServerThread rst : this.getMiniHBaseCluster().getRegionServerThreads()) { |
| if (rst.getRegionServer().isOnline() && !rst.getRegionServer().isStopped()) { |
| list.add(rst.getRegionServer()); |
| } |
| } |
| return list; |
| } |
| |
| /** |
| * @return the only HRegion for ROOT table. Returns a null if not found. |
| * It loops over all regions to find it, could be slow. |
| */ |
| public HRegion getRootRegion() { |
| for (HRegionServer rs : getOnlineRegionServers()) { |
| for (HRegion region : rs.getOnlineRegions()) { |
| if (region.getRegionInfo().isRootRegion()) { |
| return region; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @return one of HRegion for META table. Returns a null if not found. |
| * It loops over all regions to find it, could be slow. |
| */ |
| public HRegion getMetaRegion() { |
| for (HRegionServer rs : getOnlineRegionServers()) { |
| for (HRegion region : rs.getOnlineRegions()) { |
| if (region.getRegionInfo().isMetaTable()) { |
| return region; |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Wait until <code>countOfRegion</code> in .META. have a non-empty |
| * info:server. This means all regions have been deployed, master has been |
| * informed and updated .META. with the regions deployed server. |
| * @param countOfRegions How many regions in .META. |
| * @throws IOException |
| */ |
| public void waitUntilAllRegionsAssigned(final int countOfRegions) |
| throws IOException { |
| try (HTable meta = |
| new HTable(getConfiguration(), HConstants.META_TABLE_NAME)) { |
| HConnection connection = ServerConnectionManager.getConnection(conf); |
| TOP_LOOP: |
| while (true) { |
| int rows = 0; |
| Scan scan = new Scan(); |
| scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); |
| ResultScanner s; |
| try { |
| s = meta.getScanner(scan); |
| } catch (RetriesExhaustedException ex) { |
| // This function has infinite patience. |
| Threads.sleepWithoutInterrupt(2000); |
| continue; |
| } catch (PreemptiveFastFailException ex) { |
| // Be more patient |
| Threads.sleepWithoutInterrupt(2000); |
| continue; |
| } |
| Map<String, HRegionInfo[]> regionAssignment = |
| new HashMap<String, HRegionInfo[]>(); |
| REGION_LOOP: |
| for (Result r = null; (r = s.next()) != null;) { |
| byte[] b = |
| r.getValue(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER); |
| if (b == null || b.length <= 0) { |
| break; |
| } |
| // Make sure the regionserver really has this region. |
| String serverAddress = Bytes.toString(b); |
| if (!regionAssignment.containsKey(serverAddress)) { |
| HRegionInterface hri = |
| connection.getHRegionConnection(new HServerAddress( |
| serverAddress), false); |
| HRegionInfo[] regions; |
| try { |
| regions = hri.getRegionsAssignment(); |
| } catch (IOException ex) { |
| LOG.info("Could not contact regionserver " + serverAddress); |
| Threads.sleepWithoutInterrupt(1000); |
| continue TOP_LOOP; |
| } |
| regionAssignment.put(serverAddress, regions); |
| } |
| String regionName = Bytes.toString(r.getRow()); |
| for (HRegionInfo regInfo : regionAssignment.get(serverAddress)) { |
| String regNameOnRS = Bytes.toString(regInfo.getRegionName()); |
| if (regNameOnRS.equals(regionName)) { |
| rows++; |
| continue REGION_LOOP; |
| } |
| } |
| } |
| s.close(); |
| // If I get to here and all rows have a Server, then all have been |
| // assigned. |
| if (rows >= countOfRegions) |
| break; |
| LOG.info("Found " + rows + " open regions, waiting for " |
| + countOfRegions); |
| Threads.sleepWithoutInterrupt(1000); |
| } |
| } |
| } |
| |
| /** |
| * Do a small get/scan against one store. This is required because store |
| * has no actual methods of querying itself, and relies on StoreScanner. |
| */ |
| public static List<KeyValue> getFromStoreFile(Store store, |
| Get get) throws IOException { |
| MultiVersionConsistencyControl.resetThreadReadPoint(); |
| Scan scan = new Scan(get); |
| InternalScanner scanner = (InternalScanner) store.getScanner(scan, |
| scan.getFamilyMap().get(store.getFamily().getName())); |
| |
| List<KeyValue> result = new ArrayList<KeyValue>(); |
| scanner.next(result); |
| if (!result.isEmpty()) { |
| // verify that we are on the row we want: |
| KeyValue kv = result.get(0); |
| if (!Bytes.equals(kv.getRow(), get.getRow())) { |
| result.clear(); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Do a small get/scan against one store. This is required because store |
| * has no actual methods of querying itself, and relies on StoreScanner. |
| */ |
| public static List<KeyValue> getFromStoreFile(Store store, |
| byte [] row, |
| NavigableSet<byte[]> columns |
| ) throws IOException { |
| Get get = new Get(row); |
| Map<byte[], NavigableSet<byte[]>> s = get.getFamilyMap(); |
| s.put(store.getFamily().getName(), columns); |
| |
| return getFromStoreFile(store,get); |
| } |
| |
| public static void assertKVListsEqual(String additionalMsg, |
| final List<KeyValue> expected, |
| final List<KeyValue> actual) { |
| final int eLen = expected.size(); |
| final int aLen = actual.size(); |
| final int minLen = Math.min(eLen, aLen); |
| |
| int i = 0; |
| while (i < minLen) { |
| if (KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) != 0) { |
| break; |
| } |
| i++; |
| } |
| |
| if (additionalMsg == null) { |
| additionalMsg = ""; |
| } |
| if (!additionalMsg.isEmpty()) { |
| additionalMsg = ". " + additionalMsg; |
| } |
| |
| if (eLen != aLen || i != minLen) { |
| Assert.assertNotEquals("KeyValue at position " + i + additionalMsg, |
| safeGetAsStr(expected, i) + " (length " + eLen + ")", |
| safeGetAsStr(actual, i) + " (length " + aLen + ")"); |
| } |
| } |
| |
| private static <T> String safeGetAsStr(List<T> lst, int i) { |
| if (0 <= i && i < lst.size()) { |
| return lst.get(i).toString(); |
| } else { |
| return "<out_of_range>"; |
| } |
| } |
| |
| /** Creates a random table with the given parameters */ |
| public HTable createRandomTable(String tableName, |
| final Collection<String> families, |
| final int maxVersions, |
| final int numColsPerRow, |
| final int numFlushes, |
| final int numRegions, |
| final int numRowsPerFlush) |
| throws IOException, InterruptedException { |
| |
| LOG.info("\n\nCreating random table " + tableName + " with " + numRegions + |
| " regions, " + numFlushes + " storefiles per region, " + |
| numRowsPerFlush + " rows per flush, maxVersions=" + maxVersions + |
| "\n"); |
| |
| final Random rand = new Random(tableName.hashCode() * 17L + 12938197137L); |
| final int numCF = families.size(); |
| final byte[][] cfBytes = new byte[numCF][]; |
| final byte[] tableNameBytes = Bytes.toBytes(tableName); |
| |
| { |
| int cfIndex = 0; |
| for (String cf : families) { |
| cfBytes[cfIndex++] = Bytes.toBytes(cf); |
| } |
| } |
| |
| final int actualStartKey = 0; |
| final int actualEndKey = Integer.MAX_VALUE; |
| final int keysPerRegion = (actualEndKey - actualStartKey) / numRegions; |
| final int splitStartKey = actualStartKey + keysPerRegion; |
| final int splitEndKey = actualEndKey - keysPerRegion; |
| final String keyFormat = "%08x"; |
| final HTable table = createTable(tableNameBytes, cfBytes, |
| maxVersions, |
| Bytes.toBytes(String.format(keyFormat, splitStartKey)), |
| Bytes.toBytes(String.format(keyFormat, splitEndKey)), |
| numRegions); |
| |
| if (hbaseCluster != null) { |
| hbaseCluster.flushcache(HConstants.META_TABLE_NAME); |
| } |
| |
| for (int iFlush = 0; iFlush < numFlushes; ++iFlush) { |
| for (int iRow = 0; iRow < numRowsPerFlush; ++iRow) { |
| final byte[] row = Bytes.toBytes(String.format(keyFormat, |
| actualStartKey + rand.nextInt(actualEndKey - actualStartKey))); |
| |
| Put put = new Put(row); |
| Delete del = new Delete(row); |
| for (int iCol = 0; iCol < numColsPerRow; ++iCol) { |
| final byte[] cf = cfBytes[rand.nextInt(numCF)]; |
| final long ts = rand.nextInt(); |
| final byte[] qual = Bytes.toBytes("col" + iCol); |
| if (rand.nextBoolean()) { |
| final byte[] value = Bytes.toBytes("value_for_row_" + iRow + |
| "_cf_" + Bytes.toStringBinary(cf) + "_col_" + iCol + "_ts_" + |
| ts + "_random_" + rand.nextLong()); |
| put.add(cf, qual, ts, value); |
| } else if (rand.nextDouble() < 0.8) { |
| del.deleteColumn(cf, qual, ts); |
| } else { |
| del.deleteColumns(cf, qual, ts); |
| } |
| } |
| |
| if (!put.isEmpty()) { |
| table.put(put); |
| } |
| |
| if (!del.isEmpty()) { |
| table.delete(del); |
| } |
| } |
| LOG.info("Initiating flush #" + iFlush + " for table " + tableName); |
| table.flushCommits(); |
| if (hbaseCluster != null) { |
| hbaseCluster.flushcache(tableNameBytes); |
| } |
| } |
| |
| return table; |
| } |
| |
| private static final int MIN_RANDOM_PORT = 0xc000; |
| private static final int MAX_RANDOM_PORT = 0xfffe; |
| |
| /** |
| * Returns a random port. These ports cannot be registered with IANA and are |
| * intended for dynamic allocation (see http://bit.ly/dynports). |
| */ |
| public static int randomPort() { |
| return MIN_RANDOM_PORT |
| + new Random().nextInt(MAX_RANDOM_PORT - MIN_RANDOM_PORT); |
| } |
| |
| /** |
| * Returns a random free port and marks that port as taken. Not thread-safe. Expected to be |
| * called from single-threaded test setup code/ |
| */ |
| public static int randomFreePort() { |
| int port = 0; |
| do { |
| port = randomPort(); |
| if (takenRandomPorts.contains(port)) { |
| continue; |
| } |
| takenRandomPorts.add(port); |
| |
| try { |
| ServerSocket sock = new ServerSocket(port); |
| sock.close(); |
| } catch (IOException ex) { |
| port = 0; |
| } |
| } while (port == 0); |
| return port; |
| } |
| |
| public static void waitForHostPort(String host, int port) |
| throws IOException { |
| final int maxTimeMs = 10000; |
| final int maxNumAttempts = maxTimeMs / HConstants.SOCKET_RETRY_WAIT_MS; |
| IOException savedException = null; |
| LOG.info("Waiting for server at " + host + ":" + port); |
| for (int attempt = 0; attempt < maxNumAttempts; ++attempt) { |
| try { |
| Socket sock = new Socket(InetAddress.getByName(host), port); |
| sock.close(); |
| savedException = null; |
| LOG.info("Server at " + host + ":" + port + " is available"); |
| break; |
| } catch (UnknownHostException e) { |
| throw new IOException("Failed to look up " + host, e); |
| } catch (IOException e) { |
| savedException = e; |
| } |
| Threads.sleepWithoutInterrupt(HConstants.SOCKET_RETRY_WAIT_MS); |
| } |
| |
| if (savedException != null) { |
| throw savedException; |
| } |
| } |
| |
| /** |
| * Creates a pre-split table for load testing. If the table already exists, |
| * logs a warning and continues. |
| * @return the number of regions the table was split into |
| */ |
| public static int createPreSplitLoadTestTable(Configuration conf, |
| byte[] tableName, byte[] columnFamily, Algorithm compression, |
| DataBlockEncoding dataBlockEncoding) throws IOException { |
| HTableDescriptor desc = new HTableDescriptor(tableName); |
| HColumnDescriptor hcd = new HColumnDescriptor(columnFamily); |
| hcd.setDataBlockEncoding(dataBlockEncoding); |
| hcd.setCompressionType(compression); |
| desc.addFamily(hcd); |
| |
| int totalNumberOfRegions = 0; |
| try { |
| HBaseAdmin admin = new HBaseAdmin(conf); |
| |
| // create a table a pre-splits regions. |
| // The number of splits is set as: |
| // region servers * regions per region server |
| int numberOfServers = admin.getClusterStatus().getServers(); |
| if (numberOfServers == 0) { |
| throw new IllegalStateException("No live regionservers"); |
| } |
| |
| totalNumberOfRegions = numberOfServers * DEFAULT_REGIONS_PER_SERVER; |
| LOG.info("Number of live regionservers: " + numberOfServers + ", " + |
| "pre-splitting table into " + totalNumberOfRegions + " regions " + |
| "(default regions per server: " + DEFAULT_REGIONS_PER_SERVER + ")"); |
| |
| byte[][] splits = new RegionSplitter.HexStringSplit().split( |
| totalNumberOfRegions); |
| |
| admin.createTable(desc, splits); |
| admin.close(); |
| } catch (MasterNotRunningException e) { |
| LOG.error("Master not running", e); |
| throw new IOException(e); |
| } catch (TableExistsException e) { |
| LOG.warn("Table " + Bytes.toStringBinary(tableName) + |
| " already exists, continuing"); |
| } |
| return totalNumberOfRegions; |
| } |
| |
| public static int getMetaRSPort(Configuration conf) throws IOException { |
| HTable table = new HTable(conf, HConstants.META_TABLE_NAME); |
| HRegionLocation hloc = table.getRegionLocation(Bytes.toBytes("")); |
| table.close(); |
| return hloc.getServerAddress().getPort(); |
| } |
| |
| public HRegion createTestRegion(String tableName, HColumnDescriptor hcd) |
| throws IOException { |
| HTableDescriptor htd = new HTableDescriptor(tableName); |
| htd.addFamily(hcd); |
| HRegionInfo info = |
| new HRegionInfo(htd, null, null, false); |
| HRegion region = |
| HRegion.createHRegion(info, getTestDir("test_region_" + |
| tableName), getConfiguration()); |
| return region; |
| } |
| |
| /** |
| * Useful for tests that do not start a full cluster, e.g. those that use a mini MR cluster only. |
| */ |
| public void cleanupTestDir() throws IOException { |
| if (this.clusterTestBuildDir != null && this.clusterTestBuildDir.exists()) { |
| // Need to use deleteDirectory because File.delete required dir is empty. |
| if (!FSUtils.deleteDirectory(FileSystem.getLocal(this.conf), |
| new Path(this.clusterTestBuildDir.toString()))) { |
| LOG.warn("Failed delete of " + this.clusterTestBuildDir.toString()); |
| } |
| } |
| } |
| |
| public static void setFileSystemURI(String fsURI) { |
| HBaseTestingUtility.fsURI = fsURI; |
| } |
| |
| private static void logMethodEntryAndSetThreadName(String methodName) { |
| LOG.info("\nStarting " + methodName + "\n"); |
| Thread.currentThread().setName(methodName); |
| } |
| |
| /** |
| * Sets the current thread name to the caller's method name. |
| */ |
| public static void setThreadNameFromMethod() { |
| logMethodEntryAndSetThreadName(new Throwable().getStackTrace()[1].getMethodName()); |
| } |
| |
| /** |
| * Sets the current thread name to the caller's caller's method name. |
| */ |
| public static void setThreadNameFromCallerMethod() { |
| logMethodEntryAndSetThreadName(new Throwable().getStackTrace()[2].getMethodName()); |
| } |
| |
| public void killMiniHBaseCluster() { |
| for (RegionServerThread rst : hbaseCluster.getRegionServerThreads()) { |
| rst.getRegionServer().kill(); |
| } |
| for (HMaster master : hbaseCluster.getMasters()) { |
| master.stop("killMiniHBaseCluster"); |
| } |
| } |
| |
| public void dropDefaultTable() throws Exception { |
| HBaseAdmin admin = new HBaseAdmin(getConfiguration()); |
| if (admin.tableExists(HTestConst.DEFAULT_TABLE_BYTES)) { |
| admin.disableTable(HTestConst.DEFAULT_TABLE_BYTES); |
| admin.deleteTable(HTestConst.DEFAULT_TABLE_BYTES); |
| } |
| admin.close(); |
| } |
| |
| public BlockCache getBlockCache() { |
| return cacheConf.getBlockCache(); |
| } |
| |
| /** |
| * Assert the number of rows are expected for a given HTable |
| * @param t The HTable instance to check |
| * @param expected The expected number of rows |
| * @throws IOException |
| */ |
| public static void assertRowCount(final HTable t, final int expected) |
| throws IOException { |
| assertEquals(expected, countRows(t, new Scan())); |
| } |
| |
| /** |
| * Count the number of rows for an HTable instance based on the given the specific |
| * scan request. |
| * |
| * @param t The HTable instance to count the rows |
| * @param s The scan request fpr counting |
| * @return Count of rows in table for a given scan request |
| * @throws IOException |
| */ |
| public static int countRows(final HTable t, final Scan s) |
| throws IOException { |
| // Assert all rows in table. |
| try (ResultScanner scanner = t.getScanner(s)) { |
| int count = 0; |
| for (Result result : scanner) { |
| count++; |
| assertTrue(result.size() > 0); |
| } |
| return count; |
| } |
| } |
| |
| /** |
| * Sets the configuration to use AssignmentLoadBalancer. |
| */ |
| public void useAssignmentLoadBalancer() { |
| this.conf.set(HConstants.LOAD_BALANCER_IMPL, |
| "org.apache.hadoop.hbase.master.RegionManager$AssignmentLoadBalancer"); |
| } |
| |
| /** |
| * Sets the configuration to use local-file-system. |
| */ |
| public void useLFS() { |
| this.conf.set(FS_TYPE_KEY, FS_TYPE_LFS); |
| } |
| |
| /** |
| * Moves the region to a new server and change the assignment accordingly. |
| * |
| * @throws IOException |
| * @throws MasterNotRunningException |
| */ |
| public void moveRegionAndAssignment(HRegionInfo regionInfo, |
| HServerAddress regionServer) throws IOException { |
| RegionPlacement rp = new RegionPlacement(conf, false, false); |
| AssignmentPlan plan = rp.getExistingAssignmentPlan(); |
| List<HServerAddress> servers = plan.getAssignment(regionInfo); |
| if (servers != null) { |
| LOG.debug("Servers for " + regionInfo.getRegionNameAsString() |
| + " in plan is " + servers); |
| // Move serverAddress in servers to the first place |
| boolean removed = servers.remove(regionServer); |
| servers.add(0, regionServer); |
| if (!removed) { |
| // To keep the same length of the server list. |
| servers.remove(servers.size() - 1); |
| } |
| LOG.debug("Servers for " + regionInfo.getRegionNameAsString() |
| + " in plan is updated to " + servers); |
| |
| plan.updateAssignmentPlan(regionInfo, servers); |
| rp.updateAssignmentPlan(plan); |
| } |
| |
| getHBaseAdmin().moveRegion(regionInfo.getRegionName(), |
| regionServer.toString()); |
| } |
| } |