| /** |
| * Copyright 2008 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.hbase.client.HConnectionManager; |
| import org.apache.hadoop.hbase.master.HMaster; |
| import org.apache.hadoop.hbase.regionserver.HRegion; |
| import org.apache.hadoop.hbase.regionserver.HRegionServer; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.JVMClusterUtil; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.hdfs.DistributedFileSystem; |
| import org.apache.hadoop.io.MapWritable; |
| import org.apache.hadoop.security.UnixUserGroupInformation; |
| import org.apache.hadoop.security.UserGroupInformation; |
| |
| /** |
| * This class creates a single process HBase cluster. |
| * each server. The master uses the 'default' FileSystem. The RegionServers, |
| * if we are running on DistributedFilesystem, create a FileSystem instance |
| * each and will close down their instance on the way out. |
| */ |
| public class MiniHBaseCluster { |
| static final Log LOG = LogFactory.getLog(MiniHBaseCluster.class.getName()); |
| private Configuration conf; |
| public LocalHBaseCluster hbaseCluster; |
| // Cache this. For some reason only works first time I get it. TODO: Figure |
| // out why. |
| private final static UserGroupInformation UGI; |
| static { |
| UGI = UserGroupInformation.getCurrentUGI(); |
| } |
| |
| static long PREFERRED_ASSIGNMENT = 1000L; |
| static long WAIT_FOR_LOADBALANCER = 2000L; |
| /** |
| * Start a MiniHBaseCluster. |
| * @param conf Configuration to be used for cluster |
| * @param numRegionServers initial number of region servers to start. |
| * @throws IOException |
| */ |
| public MiniHBaseCluster(Configuration conf, int numRegionServers) |
| throws IOException, InterruptedException { |
| this(conf, 1, numRegionServers); |
| } |
| |
| /** |
| * Start a MiniHBaseCluster. |
| * @param conf Configuration to be used for cluster |
| * @param numMasters initial number of masters to start. |
| * @param numRegionServers initial number of region servers to start. |
| * @throws IOException |
| */ |
| public MiniHBaseCluster(Configuration conf, int numMasters, |
| int numRegionServers) |
| throws IOException, InterruptedException { |
| this.conf = conf; |
| conf.set(HConstants.MASTER_PORT, "0"); |
| conf.setLong("hbase.master.applyPreferredAssignment.period", |
| PREFERRED_ASSIGNMENT); |
| conf.setLong("hbase.master.holdRegionForBestLocality.period", |
| PREFERRED_ASSIGNMENT / 5); |
| init(numMasters, numRegionServers); |
| } |
| |
| /** |
| * Override Master so can add inject behaviors testing. |
| */ |
| public static class MiniHBaseClusterMaster extends HMaster { |
| private final Map<HServerInfo, List<HMsg>> messages = |
| new ConcurrentHashMap<HServerInfo, List<HMsg>>(); |
| |
| private final Map<HServerInfo, IOException> exceptions = |
| new ConcurrentHashMap<HServerInfo, IOException>(); |
| |
| public MiniHBaseClusterMaster(final Configuration conf) |
| throws IOException { |
| super(conf); |
| } |
| |
| /** |
| * Add a message to send to a regionserver next time it checks in. |
| * @param hsi RegionServer's HServerInfo. |
| * @param msg Message to add. |
| */ |
| void addMessage(final HServerInfo hsi, HMsg msg) { |
| synchronized(this.messages) { |
| List<HMsg> hmsgs = this.messages.get(hsi); |
| if (hmsgs == null) { |
| hmsgs = new ArrayList<HMsg>(); |
| this.messages.put(hsi, hmsgs); |
| } |
| hmsgs.add(msg); |
| } |
| } |
| |
| void addException(final HServerInfo hsi, final IOException ex) { |
| this.exceptions.put(hsi, ex); |
| } |
| |
| /** |
| * This implementation is special, exceptions will be treated first and |
| * message won't be sent back to the region servers even if some are |
| * specified. |
| * @param hsi the rs |
| * @param msgs Messages to add to |
| * @return |
| * @throws IOException will be throw if any added for this region server |
| */ |
| @Override |
| protected HMsg[] adornRegionServerAnswer(final HServerInfo hsi, |
| final HMsg[] msgs) throws IOException { |
| IOException ex = this.exceptions.remove(hsi); |
| if (ex != null) { |
| throw ex; |
| } |
| HMsg [] answerMsgs = msgs; |
| synchronized (this.messages) { |
| List<HMsg> hmsgs = this.messages.get(hsi); |
| if (hmsgs != null && !hmsgs.isEmpty()) { |
| int size = answerMsgs.length; |
| HMsg [] newAnswerMsgs = new HMsg[size + hmsgs.size()]; |
| System.arraycopy(answerMsgs, 0, newAnswerMsgs, 0, answerMsgs.length); |
| for (int i = 0; i < hmsgs.size(); i++) { |
| newAnswerMsgs[answerMsgs.length + i] = hmsgs.get(i); |
| } |
| answerMsgs = newAnswerMsgs; |
| hmsgs.clear(); |
| } |
| } |
| return super.adornRegionServerAnswer(hsi, answerMsgs); |
| } |
| } |
| |
| /** |
| * Subclass so can get at protected methods (none at moment). Also, creates |
| * a FileSystem instance per instantiation. Adds a shutdown own FileSystem |
| * on the way out. Shuts down own Filesystem only, not All filesystems as |
| * the FileSystem system exit hook does. |
| */ |
| public static class MiniHBaseClusterRegionServer extends HRegionServer { |
| private static int index = 0; |
| private Thread shutdownThread = null; |
| |
| public MiniHBaseClusterRegionServer(Configuration conf) |
| throws IOException { |
| super(setDifferentUser(conf)); |
| } |
| |
| public void setHServerInfo(final HServerInfo hsi) { |
| this.serverInfo = hsi; |
| } |
| |
| /* |
| * @param c |
| * @param currentfs We return this if we did not make a new one. |
| * @param uniqueName Same name used to help identify the created fs. |
| * @return A new fs instance if we are up on DistributeFileSystem. |
| * @throws IOException |
| */ |
| private static Configuration setDifferentUser(final Configuration c) |
| throws IOException { |
| FileSystem currentfs = FileSystem.get(c); |
| if (!(currentfs instanceof DistributedFileSystem)) return c; |
| // Else distributed filesystem. Make a new instance per daemon. Below |
| // code is taken from the AppendTestUtil over in hdfs. |
| Configuration c2 = new Configuration(c); |
| String username = UGI.getUserName() + ".hrs." + index++; |
| UnixUserGroupInformation.saveToConf(c2, |
| UnixUserGroupInformation.UGI_PROPERTY_NAME, |
| new UnixUserGroupInformation(username, new String[]{"supergroup"})); |
| return c2; |
| } |
| |
| @Override |
| protected void init(MapWritable c) throws IOException { |
| super.init(c); |
| // Run this thread to shutdown our filesystem on way out. |
| this.shutdownThread = new SingleFileSystemShutdownThread(getFileSystem()); |
| } |
| |
| @Override |
| public void run() { |
| try { |
| super.run(); |
| } finally { |
| // Run this on the way out. |
| if (this.shutdownThread != null) { |
| this.shutdownThread.start(); |
| Threads.shutdown(this.shutdownThread, 30000); |
| } |
| } |
| } |
| |
| public void kill() { |
| super.kill(); |
| } |
| } |
| |
| /** |
| * Alternate shutdown hook. |
| * Just shuts down the passed fs, not all as default filesystem hook does. |
| */ |
| static class SingleFileSystemShutdownThread extends Thread { |
| private final FileSystem fs; |
| SingleFileSystemShutdownThread(final FileSystem fs) { |
| super("Shutdown of " + fs); |
| this.fs = fs; |
| } |
| @Override |
| public void run() { |
| try { |
| LOG.info("Hook closing fs=" + this.fs); |
| this.fs.close(); |
| } catch (IOException e) { |
| LOG.warn("Running hook", e); |
| } |
| } |
| } |
| |
| private void init(final int nMasterNodes, final int nRegionNodes) |
| throws IOException { |
| try { |
| // start up a LocalHBaseCluster |
| hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes, |
| MiniHBaseCluster.MiniHBaseClusterMaster.class, |
| MiniHBaseCluster.MiniHBaseClusterRegionServer.class); |
| hbaseCluster.startup(); |
| } catch(IOException e) { |
| shutdown(); |
| throw e; |
| } |
| } |
| |
| /** |
| * Starts a region server thread running |
| * |
| * @throws IOException |
| * @return New RegionServerThread |
| */ |
| public JVMClusterUtil.RegionServerThread startRegionServer() throws IOException { |
| JVMClusterUtil.RegionServerThread t = this.hbaseCluster.addRegionServer(); |
| t.start(); |
| t.waitForServerOnline(); |
| return t; |
| } |
| |
| /** |
| * @return Returns the rpc address actually used by the master server, because |
| * the supplied port is not necessarily the actual port used. |
| */ |
| public HServerAddress getHMasterAddress() { |
| return this.hbaseCluster.getMaster().getMasterAddress(); |
| } |
| |
| /** |
| * @return the HMaster |
| */ |
| public HMaster getMaster() { |
| return this.hbaseCluster.getMaster(); |
| } |
| |
| /** |
| * Adds a new master to the cluster and starts the master thread. Useful if |
| * the existing master dies and a live master is needed for cleanup. |
| */ |
| public void startNewMaster() throws IOException { |
| hbaseCluster.addMaster().start(); |
| } |
| |
| /** |
| * Cause a region server to exit doing basic clean up only on its way out. |
| * @param serverNumber Used as index into a list. |
| */ |
| public String abortRegionServer(int serverNumber) { |
| HRegionServer server = getRegionServer(serverNumber); |
| LOG.info("Aborting " + server.toString()); |
| server.abort("Aborting for tests", new Exception("Trace info")); |
| return server.toString(); |
| } |
| |
| /** |
| * Shut down the specified region server cleanly |
| * |
| * @param serverNumber Used as index into a list. |
| * @return the region server that was stopped |
| */ |
| public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber) { |
| return stopRegionServer(serverNumber, true); |
| } |
| |
| /** |
| * Shut down the specified region server cleanly |
| * |
| * @param serverNumber Used as index into a list. |
| * @param shutdownFS True is we are to shutdown the filesystem as part of this |
| * regionserver's shutdown. Usually we do but you do not want to do this if |
| * you are running multiple regionservers in a test and you shut down one |
| * before end of the test. |
| * @return the region server that was stopped |
| */ |
| public JVMClusterUtil.RegionServerThread stopRegionServer(int serverNumber, |
| final boolean shutdownFS) { |
| JVMClusterUtil.RegionServerThread server = |
| hbaseCluster.getRegionServers().get(serverNumber); |
| LOG.info("Stopping " + server.toString()); |
| server.getRegionServer().stop(); |
| return server; |
| } |
| |
| /** |
| * Wait for the specified region server to stop. Removes this thread from list |
| * of running threads. |
| * @param serverNumber |
| * @return Name of region server that just went down. |
| */ |
| public String waitOnRegionServer(final int serverNumber) { |
| return this.hbaseCluster.waitOnRegionServer(serverNumber); |
| } |
| |
| /** |
| * Wait for Mini HBase Cluster to shut down. |
| */ |
| public void join() { |
| this.hbaseCluster.join(); |
| } |
| |
| /** |
| * Shut down the mini HBase cluster |
| * @throws IOException |
| */ |
| public void shutdown() throws IOException { |
| if (this.hbaseCluster != null) { |
| this.hbaseCluster.shutdown(); |
| } |
| HConnectionManager.deleteAllConnections(false); |
| } |
| |
| /** |
| * Call flushCache on all regions on all participating regionservers. |
| * @throws IOException |
| */ |
| public void flushcache() throws IOException { |
| for (JVMClusterUtil.RegionServerThread t: |
| this.hbaseCluster.getRegionServers()) { |
| for(HRegion r: t.getRegionServer().getOnlineRegions()) { |
| r.flushcache(); |
| } |
| } |
| } |
| |
| /** |
| * Call flushCache on all regions of the specified table. |
| * @throws IOException |
| */ |
| public void flushcache(byte [] tableName) throws IOException { |
| for (JVMClusterUtil.RegionServerThread t: |
| this.hbaseCluster.getRegionServers()) { |
| for(HRegion r: t.getRegionServer().getOnlineRegions()) { |
| if(Bytes.equals(r.getTableDesc().getName(), tableName)) { |
| r.flushcache(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @return List of region server threads. |
| */ |
| public List<JVMClusterUtil.RegionServerThread> getRegionServerThreads() { |
| return this.hbaseCluster.getRegionServers(); |
| } |
| |
| /** |
| * @return List of live region server threads (skips the aborted and the killed) |
| */ |
| public List<JVMClusterUtil.RegionServerThread> getLiveRegionServerThreads() { |
| return this.hbaseCluster.getLiveRegionServers(); |
| } |
| |
| /** |
| * Grab a numbered region server of your choice. |
| * @param serverNumber |
| * @return region server |
| */ |
| public HRegionServer getRegionServer(int serverNumber) { |
| return hbaseCluster.getRegionServer(serverNumber); |
| } |
| |
| public List<HRegion> getRegions(byte[] tableName) { |
| List<HRegion> ret = new ArrayList<HRegion>(); |
| for (JVMClusterUtil.RegionServerThread rst : getRegionServerThreads()) { |
| HRegionServer hrs = rst.getRegionServer(); |
| for (HRegion region : hrs.getOnlineRegions()) { |
| if (Bytes.equals(region.getTableDesc().getName(), tableName)) { |
| ret.add(region); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} |
| * of HRS carrying regionName. Returns -1 if none found. |
| */ |
| public int getServerWithMeta() { |
| return getServerWith(HRegionInfo.FIRST_META_REGIONINFO.getRegionName()); |
| } |
| |
| /** |
| * Get the location of the specified region |
| * @param regionName Name of the region in bytes |
| * @return Index into List of {@link MiniHBaseCluster#getRegionServerThreads()} |
| * of HRS carrying .META.. Returns -1 if none found. |
| */ |
| public int getServerWith(byte[] regionName) { |
| int index = -1; |
| int count = 0; |
| for (JVMClusterUtil.RegionServerThread rst: getRegionServerThreads()) { |
| HRegionServer hrs = rst.getRegionServer(); |
| HRegion metaRegion = |
| hrs.getOnlineRegion(regionName); |
| if (metaRegion != null) { |
| index = count; |
| break; |
| } |
| count++; |
| } |
| return index; |
| } |
| |
| /** |
| * Add an exception to send when a region server checks back in |
| * @param serverNumber Which server to send it to |
| * @param ex The exception that will be sent |
| * @throws IOException |
| */ |
| public void addExceptionToSendRegionServer(final int serverNumber, |
| IOException ex) throws IOException { |
| MiniHBaseClusterRegionServer hrs = |
| (MiniHBaseClusterRegionServer)getRegionServer(serverNumber); |
| addExceptionToSendRegionServer(hrs, ex); |
| } |
| |
| /** |
| * Add an exception to send when a region server checks back in |
| * @param hrs Which server to send it to |
| * @param ex The exception that will be sent |
| * @throws IOException |
| */ |
| public void addExceptionToSendRegionServer( |
| final MiniHBaseClusterRegionServer hrs, IOException ex) |
| throws IOException { |
| ((MiniHBaseClusterMaster)getMaster()).addException(hrs.getHServerInfo(),ex); |
| } |
| |
| /** |
| * Add a message to include in the responses send a regionserver when it |
| * checks back in. |
| * @param serverNumber Which server to send it to. |
| * @param msg The MESSAGE |
| * @throws IOException |
| */ |
| public void addMessageToSendRegionServer(final int serverNumber, |
| final HMsg msg) |
| throws IOException { |
| MiniHBaseClusterRegionServer hrs = |
| (MiniHBaseClusterRegionServer)getRegionServer(serverNumber); |
| addMessageToSendRegionServer(hrs, msg); |
| } |
| |
| /** |
| * Add a message to include in the responses send a regionserver when it |
| * checks back in. |
| * @param hrs Which region server. |
| * @param msg The MESSAGE |
| * @throws IOException |
| */ |
| public void addMessageToSendRegionServer(final MiniHBaseClusterRegionServer hrs, |
| final HMsg msg) |
| throws IOException { |
| ((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg); |
| } |
| } |