| /** |
| * Copyright 2007 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.lang.management.ManagementFactory; |
| import java.lang.management.MemoryUsage; |
| import java.lang.management.RuntimeMXBean; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.Field; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.Chore; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HMsg; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.HServerAddress; |
| import org.apache.hadoop.hbase.HServerInfo; |
| import org.apache.hadoop.hbase.HServerLoad; |
| import org.apache.hadoop.hbase.HStoreKey; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.LeaseListener; |
| import org.apache.hadoop.hbase.Leases; |
| import org.apache.hadoop.hbase.LocalHBaseCluster; |
| import org.apache.hadoop.hbase.NotServingRegionException; |
| import org.apache.hadoop.hbase.RegionHistorian; |
| import org.apache.hadoop.hbase.RemoteExceptionHandler; |
| import org.apache.hadoop.hbase.UnknownRowLockException; |
| import org.apache.hadoop.hbase.UnknownScannerException; |
| import org.apache.hadoop.hbase.ValueOverMaxLengthException; |
| import org.apache.hadoop.hbase.HMsg.Type; |
| import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; |
| import org.apache.hadoop.hbase.client.ServerConnection; |
| import org.apache.hadoop.hbase.client.ServerConnectionManager; |
| import org.apache.hadoop.hbase.filter.RowFilterInterface; |
| import org.apache.hadoop.hbase.io.BatchOperation; |
| import org.apache.hadoop.hbase.io.BatchUpdate; |
| import org.apache.hadoop.hbase.io.Cell; |
| import org.apache.hadoop.hbase.io.HbaseMapWritable; |
| import org.apache.hadoop.hbase.io.RowResult; |
| import org.apache.hadoop.hbase.ipc.HBaseRPC; |
| import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; |
| import org.apache.hadoop.hbase.ipc.HBaseServer; |
| import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; |
| import org.apache.hadoop.hbase.ipc.HRegionInterface; |
| import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.InfoServer; |
| import org.apache.hadoop.hbase.util.Sleeper; |
| import org.apache.hadoop.hbase.util.Threads; |
| import org.apache.hadoop.io.MapWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * HRegionServer makes a set of HRegions available to clients. It checks in with |
| * the HMaster. There are many HRegionServers in a single HBase deployment. |
| */ |
| public class HRegionServer implements HConstants, HRegionInterface, Runnable { |
| static final Log LOG = LogFactory.getLog(HRegionServer.class); |
| |
| // Set when a report to the master comes back with a message asking us to |
| // shutdown. Also set by call to stop when debugging or running unit tests |
| // of HRegionServer in isolation. We use AtomicBoolean rather than |
| // plain boolean so we can pass a reference to Chore threads. Otherwise, |
| // Chore threads need to know about the hosting class. |
| protected final AtomicBoolean stopRequested = new AtomicBoolean(false); |
| |
| protected final AtomicBoolean quiesced = new AtomicBoolean(false); |
| |
| protected final AtomicBoolean safeMode = new AtomicBoolean(true); |
| |
| // Go down hard. Used if file system becomes unavailable and also in |
| // debugging and unit tests. |
| protected volatile boolean abortRequested; |
| |
| // If false, the file system has become unavailable |
| protected volatile boolean fsOk; |
| |
| protected final HServerInfo serverInfo; |
| protected final HBaseConfiguration conf; |
| |
| private final ServerConnection connection; |
| private final AtomicBoolean haveRootRegion = new AtomicBoolean(false); |
| private FileSystem fs; |
| private Path rootDir; |
| private final Random rand = new Random(); |
| |
| // Key is Bytes.hashCode of region name byte array and the value is HRegion |
| // in both of the maps below. Use Bytes.mapKey(byte []) generating key for |
| // below maps. |
| protected final Map<Integer, HRegion> onlineRegions = |
| new ConcurrentHashMap<Integer, HRegion>(); |
| |
| protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final List<HMsg> outboundMsgs = |
| Collections.synchronizedList(new ArrayList<HMsg>()); |
| |
| final int numRetries; |
| protected final int threadWakeFrequency; |
| private final int msgInterval; |
| private final int serverLeaseTimeout; |
| |
| protected final int numRegionsToReport; |
| |
| // Remote HMaster |
| private HMasterRegionInterface hbaseMaster; |
| |
| // Server to handle client requests. Default access so can be accessed by |
| // unit tests. |
| final HBaseServer server; |
| |
| // Leases |
| private final Leases leases; |
| |
| // Request counter |
| private volatile AtomicInteger requestCount = new AtomicInteger(); |
| |
| // Info server. Default access so can be used by unit tests. REGIONSERVER |
| // is name of the webapp and the attribute name used stuffing this instance |
| // into web context. |
| InfoServer infoServer; |
| |
| /** region server process name */ |
| public static final String REGIONSERVER = "regionserver"; |
| |
| /* |
| * Space is reserved in HRS constructor and then released when aborting |
| * to recover from an OOME. See HBASE-706. TODO: Make this percentage of the |
| * heap or a minimum. |
| */ |
| private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>(); |
| |
| private RegionServerMetrics metrics; |
| |
| // Compactions |
| final CompactSplitThread compactSplitThread; |
| |
| // Cache flushing |
| final MemcacheFlusher cacheFlusher; |
| |
| /* Check for major compactions. |
| */ |
| final Chore majorCompactionChecker; |
| |
| // HLog and HLog roller. log is protected rather than private to avoid |
| // eclipse warning when accessed by inner classes |
| protected volatile HLog log; |
| final LogRoller logRoller; |
| final LogFlusher logFlusher; |
| |
| // safemode processing |
| SafeModeThread safeModeThread; |
| |
| // flag set after we're done setting up server threads (used for testing) |
| protected volatile boolean isOnline; |
| |
| final Map<String, InternalScanner> scanners = |
| new ConcurrentHashMap<String, InternalScanner>(); |
| |
| /** |
| * Starts a HRegionServer at the default location |
| * @param conf |
| * @throws IOException |
| */ |
| public HRegionServer(HBaseConfiguration conf) throws IOException { |
| this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, |
| DEFAULT_REGIONSERVER_ADDRESS)), conf); |
| } |
| |
| /** |
| * Starts a HRegionServer at the specified location |
| * @param address |
| * @param conf |
| * @throws IOException |
| */ |
| public HRegionServer(HServerAddress address, HBaseConfiguration conf) |
| throws IOException { |
| this.abortRequested = false; |
| this.fsOk = true; |
| this.conf = conf; |
| this.connection = ServerConnectionManager.getConnection(conf); |
| |
| this.isOnline = false; |
| |
| // Config'ed params |
| this.numRetries = conf.getInt("hbase.client.retries.number", 2); |
| this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000); |
| this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); |
| this.serverLeaseTimeout = |
| conf.getInt("hbase.master.lease.period", 120 * 1000); |
| |
| // Cache flushing thread. |
| this.cacheFlusher = new MemcacheFlusher(conf, this); |
| |
| // Compaction thread |
| this.compactSplitThread = new CompactSplitThread(this); |
| |
| // Log rolling thread |
| this.logRoller = new LogRoller(this); |
| |
| // Log flushing thread |
| this.logFlusher = |
| new LogFlusher(this.threadWakeFrequency, this.stopRequested); |
| |
| // Background thread to check for major compactions; needed if region |
| // has not gotten updates in a while. Make it run at a lesser frequency. |
| int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY + |
| ".multiplier", 1000); |
| this.majorCompactionChecker = new MajorCompactionChecker(this, |
| this.threadWakeFrequency * multiplier, this.stopRequested); |
| |
| // Task thread to process requests from Master |
| this.worker = new Worker(); |
| this.workerThread = new Thread(worker); |
| |
| // Server to handle client requests |
| this.server = HBaseRPC.getServer(this, address.getBindAddress(), |
| address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10), |
| false, conf); |
| // Address is givin a default IP for the moment. Will be changed after |
| // calling the master. |
| this.serverInfo = new HServerInfo(new HServerAddress( |
| new InetSocketAddress(DEFAULT_HOST, |
| this.server.getListenerAddress().getPort())), System.currentTimeMillis(), |
| this.conf.getInt("hbase.regionserver.info.port", 60030)); |
| if (this.serverInfo.getServerAddress() == null) { |
| throw new NullPointerException("Server address cannot be null; " + |
| "hbase-958 debugging"); |
| } |
| this.numRegionsToReport = |
| conf.getInt("hbase.regionserver.numregionstoreport", 10); |
| |
| this.leases = new Leases( |
| conf.getInt("hbase.regionserver.lease.period", 60 * 1000), |
| this.threadWakeFrequency); |
| |
| int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4); |
| for(int i = 0; i < nbBlocks; i++) { |
| reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]); |
| } |
| } |
| |
| /** |
| * The HRegionServer sticks in this loop until closed. It repeatedly checks |
| * in with the HMaster, sending heartbeats & reports, and receiving HRegion |
| * load/unload instructions. |
| */ |
| public void run() { |
| boolean quiesceRequested = false; |
| // A sleeper that sleeps for msgInterval. |
| Sleeper sleeper = new Sleeper(this.msgInterval, this.stopRequested); |
| try { |
| init(reportForDuty(sleeper)); |
| long lastMsg = 0; |
| // Now ask master what it wants us to do and tell it what we have done |
| for (int tries = 0; !stopRequested.get() && isHealthy();) { |
| // Try to get the root region location from the master. |
| if (!haveRootRegion.get()) { |
| HServerAddress rootServer = hbaseMaster.getRootRegionLocation(); |
| if (rootServer != null) { |
| // By setting the root region location, we bypass the wait imposed on |
| // HTable for all regions being assigned. |
| this.connection.setRootRegionLocation( |
| new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer)); |
| haveRootRegion.set(true); |
| } |
| } |
| long now = System.currentTimeMillis(); |
| if (lastMsg != 0 && (now - lastMsg) >= serverLeaseTimeout) { |
| // It has been way too long since we last reported to the master. |
| LOG.warn("unable to report to master for " + (now - lastMsg) + |
| " milliseconds - retrying"); |
| } |
| if ((now - lastMsg) >= msgInterval) { |
| HMsg outboundArray[] = null; |
| synchronized(this.outboundMsgs) { |
| outboundArray = |
| this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]); |
| this.outboundMsgs.clear(); |
| } |
| try { |
| doMetrics(); |
| MemoryUsage memory = |
| ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); |
| HServerLoad hsl = new HServerLoad(requestCount.get(), |
| (int)(memory.getUsed()/1024/1024), |
| (int)(memory.getMax()/1024/1024)); |
| for (HRegion r: onlineRegions.values()) { |
| hsl.addRegionInfo(createRegionLoad(r)); |
| } |
| this.serverInfo.setLoad(hsl); |
| this.requestCount.set(0); |
| HMsg msgs[] = hbaseMaster.regionServerReport( |
| serverInfo, outboundArray, getMostLoadedRegions()); |
| lastMsg = System.currentTimeMillis(); |
| if (this.quiesced.get() && onlineRegions.size() == 0) { |
| // We've just told the master we're exiting because we aren't |
| // serving any regions. So set the stop bit and exit. |
| LOG.info("Server quiesced and not serving any regions. " + |
| "Starting shutdown"); |
| stopRequested.set(true); |
| this.outboundMsgs.clear(); |
| continue; |
| } |
| |
| // Queue up the HMaster's instruction stream for processing |
| boolean restart = false; |
| for(int i = 0; |
| !restart && !stopRequested.get() && i < msgs.length; |
| i++) { |
| LOG.info(msgs[i].toString()); |
| switch(msgs[i].getType()) { |
| case MSG_CALL_SERVER_STARTUP: |
| // We the MSG_CALL_SERVER_STARTUP on startup but we can also |
| // get it when the master is panicing because for instance |
| // the HDFS has been yanked out from under it. Be wary of |
| // this message. |
| if (checkFileSystem()) { |
| closeAllRegions(); |
| try { |
| log.closeAndDelete(); |
| } catch (Exception e) { |
| LOG.error("error closing and deleting HLog", e); |
| } |
| try { |
| serverInfo.setStartCode(System.currentTimeMillis()); |
| log = setupHLog(); |
| this.logFlusher.setHLog(log); |
| } catch (IOException e) { |
| this.abortRequested = true; |
| this.stopRequested.set(true); |
| e = RemoteExceptionHandler.checkIOException(e); |
| LOG.fatal("error restarting server", e); |
| break; |
| } |
| reportForDuty(sleeper); |
| restart = true; |
| } else { |
| LOG.fatal("file system available check failed. " + |
| "Shutting down server."); |
| } |
| break; |
| |
| case MSG_REGIONSERVER_STOP: |
| stopRequested.set(true); |
| break; |
| |
| case MSG_REGIONSERVER_QUIESCE: |
| if (!quiesceRequested) { |
| try { |
| toDo.put(new ToDoEntry(msgs[i])); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("Putting into msgQueue was " + |
| "interrupted.", e); |
| } |
| quiesceRequested = true; |
| } |
| break; |
| |
| default: |
| if (fsOk) { |
| try { |
| toDo.put(new ToDoEntry(msgs[i])); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("Putting into msgQueue was " + |
| "interrupted.", e); |
| } |
| } |
| } |
| } |
| // Reset tries count if we had a successful transaction. |
| tries = 0; |
| |
| if (restart || this.stopRequested.get()) { |
| toDo.clear(); |
| continue; |
| } |
| } catch (Exception e) { |
| if (e instanceof IOException) { |
| e = RemoteExceptionHandler.checkIOException((IOException) e); |
| } |
| if (tries < this.numRetries) { |
| LOG.warn("Processing message (Retry: " + tries + ")", e); |
| tries++; |
| } else { |
| LOG.error("Exceeded max retries: " + this.numRetries, e); |
| checkFileSystem(); |
| } |
| if (this.stopRequested.get()) { |
| LOG.info("Stop was requested, clearing the toDo " + |
| "despite of the exception"); |
| toDo.clear(); |
| continue; |
| } |
| } |
| } |
| // Do some housekeeping before going to sleep |
| housekeeping(); |
| sleeper.sleep(lastMsg); |
| } // for |
| } catch (Throwable t) { |
| if (!checkOOME(t)) { |
| LOG.fatal("Unhandled exception. Aborting...", t); |
| abort(); |
| } |
| } |
| RegionHistorian.getInstance().offline(); |
| this.leases.closeAfterLeasesExpire(); |
| this.worker.stop(); |
| this.server.stop(); |
| if (this.infoServer != null) { |
| LOG.info("Stopping infoServer"); |
| try { |
| this.infoServer.stop(); |
| } catch (InterruptedException ex) { |
| ex.printStackTrace(); |
| } |
| } |
| |
| // Send interrupts to wake up threads if sleeping so they notice shutdown. |
| // TODO: Should we check they are alive? If OOME could have exited already |
| cacheFlusher.interruptIfNecessary(); |
| logFlusher.interrupt(); |
| compactSplitThread.interruptIfNecessary(); |
| logRoller.interruptIfNecessary(); |
| this.majorCompactionChecker.interrupt(); |
| |
| if (abortRequested) { |
| if (this.fsOk) { |
| // Only try to clean up if the file system is available |
| try { |
| if (this.log != null) { |
| this.log.close(); |
| LOG.info("On abort, closed hlog"); |
| } |
| } catch (Throwable e) { |
| LOG.error("Unable to close log in abort", |
| RemoteExceptionHandler.checkThrowable(e)); |
| } |
| closeAllRegions(); // Don't leave any open file handles |
| } |
| LOG.info("aborting server at: " + |
| serverInfo.getServerAddress().toString()); |
| } else { |
| ArrayList<HRegion> closedRegions = closeAllRegions(); |
| try { |
| log.closeAndDelete(); |
| } catch (Throwable e) { |
| LOG.error("Close and delete failed", |
| RemoteExceptionHandler.checkThrowable(e)); |
| } |
| try { |
| HMsg[] exitMsg = new HMsg[closedRegions.size() + 1]; |
| exitMsg[0] = HMsg.REPORT_EXITING; |
| // Tell the master what regions we are/were serving |
| int i = 1; |
| for (HRegion region: closedRegions) { |
| exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, |
| region.getRegionInfo()); |
| } |
| |
| LOG.info("telling master that region server is shutting down at: " + |
| serverInfo.getServerAddress().toString()); |
| hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null); |
| } catch (Throwable e) { |
| LOG.warn("Failed to send exiting message to master: ", |
| RemoteExceptionHandler.checkThrowable(e)); |
| } |
| LOG.info("stopping server at: " + |
| serverInfo.getServerAddress().toString()); |
| } |
| if (this.hbaseMaster != null) { |
| HBaseRPC.stopProxy(this.hbaseMaster); |
| this.hbaseMaster = null; |
| } |
| join(); |
| |
| runThread(this.hdfsShutdownThread, |
| this.conf.getLong("hbase.dfs.shutdown.wait", 30000)); |
| LOG.info(Thread.currentThread().getName() + " exiting"); |
| } |
| |
| /** |
| * Run and wait on passed thread in HRS context. |
| * @param t |
| * @param dfsShutdownWait |
| */ |
| public void runThread(final Thread t, final long dfsShutdownWait) { |
| if (t == null) { |
| return; |
| } |
| t.start(); |
| Threads.shutdown(t, dfsShutdownWait); |
| } |
| |
| /** |
| * Set the hdfs shutdown thread to run on exit. Pass null to disable |
| * running of the shutdown test. Needed by tests. |
| * @param t Thread to run. Pass null to disable tests. |
| * @return Previous occupant of the shutdown thread position. |
| */ |
| public Thread setHDFSShutdownThreadOnExit(final Thread t) { |
| Thread old = this.hdfsShutdownThread; |
| this.hdfsShutdownThread = t; |
| return old; |
| } |
| |
| /* |
| * Run init. Sets up hlog and starts up all server threads. |
| * @param c Extra configuration. |
| */ |
| protected void init(final MapWritable c) throws IOException { |
| try { |
| for (Map.Entry<Writable, Writable> e: c.entrySet()) { |
| String key = e.getKey().toString(); |
| String value = e.getValue().toString(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Config from master: " + key + "=" + value); |
| } |
| this.conf.set(key, value); |
| } |
| // Master may have sent us a new address with the other configs. |
| // Update our address in this case. See HBASE-719 |
| if(conf.get("hbase.regionserver.address") != null) |
| serverInfo.setServerAddress(new HServerAddress |
| (conf.get("hbase.regionserver.address"), |
| serverInfo.getServerAddress().getPort())); |
| // Master sent us hbase.rootdir to use. Should be fully qualified |
| // path with file system specification included. Set 'fs.default.name' |
| // to match the filesystem on hbase.rootdir else underlying hadoop hdfs |
| // accessors will be going against wrong filesystem (unless all is set |
| // to defaults). |
| this.conf.set("fs.default.name", this.conf.get("hbase.rootdir")); |
| this.fs = FileSystem.get(this.conf); |
| |
| // Register shutdown hook for HRegionServer, runs an orderly shutdown |
| // when a kill signal is recieved |
| Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, |
| Thread.currentThread())); |
| this.hdfsShutdownThread = suppressHdfsShutdownHook(); |
| |
| this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); |
| this.log = setupHLog(); |
| this.logFlusher.setHLog(log); |
| // Init in here rather than in constructor after thread name has been set |
| this.metrics = new RegionServerMetrics(); |
| startServiceThreads(); |
| isOnline = true; |
| } catch (Throwable e) { |
| this.isOnline = false; |
| this.stopRequested.set(true); |
| throw convertThrowableToIOE(cleanup(e, "Failed init"), |
| "Region server startup failed"); |
| } |
| } |
| |
| /* |
| * @param r Region to get RegionLoad for. |
| * @return RegionLoad instance. |
| * @throws IOException |
| */ |
| private HServerLoad.RegionLoad createRegionLoad(final HRegion r) |
| throws IOException { |
| byte[] name = r.getRegionName(); |
| int stores = 0; |
| int storefiles = 0; |
| int memcacheSizeMB = (int)(r.memcacheSize.get()/1024/1024); |
| int storefileIndexSizeMB = 0; |
| synchronized (r.stores) { |
| stores += r.stores.size(); |
| for (HStore store: r.stores.values()) { |
| storefiles += store.getStorefilesCount(); |
| storefileIndexSizeMB += |
| (int)(store.getStorefilesIndexSize()/1024/1024); |
| } |
| } |
| return new HServerLoad.RegionLoad(name, stores, storefiles, memcacheSizeMB, |
| storefileIndexSizeMB); |
| } |
| |
| /** |
| * @param regionName |
| * @return An instance of RegionLoad. |
| * @throws IOException |
| */ |
| public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName) |
| throws IOException { |
| return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName))); |
| } |
| |
| /* |
| * Cleanup after Throwable caught invoking method. Converts <code>t</code> |
| * to IOE if it isn't already. |
| * @param t Throwable |
| * @return Throwable converted to an IOE; methods can only let out IOEs. |
| */ |
| private Throwable cleanup(final Throwable t) { |
| return cleanup(t, null); |
| } |
| |
| /* |
| * Cleanup after Throwable caught invoking method. Converts <code>t</code> |
| * to IOE if it isn't already. |
| * @param t Throwable |
| * @param msg Message to log in error. Can be null. |
| * @return Throwable converted to an IOE; methods can only let out IOEs. |
| */ |
| private Throwable cleanup(final Throwable t, final String msg) { |
| if (msg == null) { |
| LOG.error(RemoteExceptionHandler.checkThrowable(t)); |
| } else { |
| LOG.error(msg, RemoteExceptionHandler.checkThrowable(t)); |
| } |
| if (!checkOOME(t)) { |
| checkFileSystem(); |
| } |
| return t; |
| } |
| |
| /* |
| * @param t |
| * @return Make <code>t</code> an IOE if it isn't already. |
| */ |
| private IOException convertThrowableToIOE(final Throwable t) { |
| return convertThrowableToIOE(t, null); |
| } |
| |
| /* |
| * @param t |
| * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE |
| * @return Make <code>t</code> an IOE if it isn't already. |
| */ |
| private IOException convertThrowableToIOE(final Throwable t, |
| final String msg) { |
| return (t instanceof IOException? (IOException)t: |
| msg == null || msg.length() == 0? |
| new IOException(t): new IOException(msg, t)); |
| } |
| /* |
| * Check if an OOME and if so, call abort. |
| * @param e |
| * @return True if we OOME'd and are aborting. |
| */ |
| private boolean checkOOME(final Throwable e) { |
| boolean stop = false; |
| if (e instanceof OutOfMemoryError || |
| (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) || |
| (e.getMessage() != null && |
| e.getMessage().contains("java.lang.OutOfMemoryError"))) { |
| LOG.fatal("OutOfMemoryError, aborting.", e); |
| abort(); |
| stop = true; |
| } |
| return stop; |
| } |
| |
| |
| /** |
| * Checks to see if the file system is still accessible. |
| * If not, sets abortRequested and stopRequested |
| * |
| * @return false if file system is not available |
| */ |
| protected boolean checkFileSystem() { |
| if (this.fsOk && this.fs != null) { |
| try { |
| FSUtils.checkFileSystemAvailable(this.fs); |
| } catch (IOException e) { |
| LOG.fatal("Shutting down HRegionServer: file system not available", e); |
| abort(); |
| this.fsOk = false; |
| } |
| } |
| return this.fsOk; |
| } |
| |
| /** |
| * Thread for toggling safemode after some configurable interval. |
| */ |
| private class SafeModeThread extends Thread { |
| @Override |
| public void run() { |
| // first, wait the required interval before turning off safemode |
| int safemodeInterval = |
| conf.getInt("hbase.regionserver.safemode.period", 120 * 1000); |
| try { |
| Thread.sleep(safemodeInterval); |
| } catch (InterruptedException ex) { |
| // turn off safemode and limits on the way out due to some kind of |
| // abnormal condition so we do not prevent such things as memcache |
| // flushes and worsen the situation |
| safeMode.set(false); |
| compactSplitThread.setLimit(-1); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(this.getName() + " exiting on interrupt"); |
| } |
| return; |
| } |
| LOG.info("leaving safe mode"); |
| safeMode.set(false); |
| |
| // now that safemode is off, slowly increase the per-cycle compaction |
| // limit, finally setting it to unlimited (-1) |
| int compactionCheckInterval = |
| conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency", |
| 20 * 1000); |
| final int limitSteps[] = { |
| 1, 1, 1, 1, |
| 2, 2, 2, 2, 2, 2, |
| 3, 3, 3, 3, 3, 3, 3, 3, |
| 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, |
| -1 |
| }; |
| for (int i = 0; i < limitSteps.length; i++) { |
| // Just log changes. |
| if (compactSplitThread.getLimit() != limitSteps[i] && |
| LOG.isDebugEnabled()) { |
| LOG.debug("setting compaction limit to " + limitSteps[i]); |
| } |
| compactSplitThread.setLimit(limitSteps[i]); |
| try { |
| Thread.sleep(compactionCheckInterval); |
| } catch (InterruptedException ex) { |
| // unlimit compactions before exiting |
| compactSplitThread.setLimit(-1); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(this.getName() + " exiting on interrupt"); |
| } |
| return; |
| } |
| } |
| LOG.info("compactions no longer limited"); |
| } |
| } |
| |
| /* |
| * Thread to shutdown the region server in an orderly manner. This thread |
| * is registered as a shutdown hook in the HRegionServer constructor and is |
| * only called when the HRegionServer receives a kill signal. |
| */ |
| private static class ShutdownThread extends Thread { |
| private final HRegionServer instance; |
| private final Thread mainThread; |
| |
| /** |
| * @param instance |
| * @param mainThread |
| */ |
| public ShutdownThread(HRegionServer instance, Thread mainThread) { |
| this.instance = instance; |
| this.mainThread = mainThread; |
| } |
| |
| @Override |
| public void run() { |
| LOG.info("Starting shutdown thread."); |
| |
| // tell the region server to stop |
| instance.stop(); |
| |
| // Wait for main thread to exit. |
| Threads.shutdown(mainThread); |
| |
| LOG.info("Shutdown thread complete"); |
| } |
| } |
| |
| // We need to call HDFS shutdown when we are done shutting down |
| private Thread hdfsShutdownThread; |
| |
| /* |
| * Inner class that runs on a long period checking if regions need major |
| * compaction. |
| */ |
| private static class MajorCompactionChecker extends Chore { |
| private final HRegionServer instance; |
| |
| MajorCompactionChecker(final HRegionServer h, |
| final int sleepTime, final AtomicBoolean stopper) { |
| super(sleepTime, stopper); |
| this.instance = h; |
| LOG.info("Runs every " + sleepTime + "ms"); |
| } |
| |
| @Override |
| protected void chore() { |
| Set<Integer> keys = this.instance.onlineRegions.keySet(); |
| for (Integer i: keys) { |
| HRegion r = this.instance.onlineRegions.get(i); |
| try { |
| if (r != null && r.isMajorCompaction()) { |
| // Queue a compaction. Will recognize if major is needed. |
| this.instance.compactSplitThread. |
| compactionRequested(r, getName() + " requests major compaction"); |
| } |
| } catch (IOException e) { |
| LOG.warn("Failed major compaction check on " + r, e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * So, HDFS caches FileSystems so when you call FileSystem.get it's fast. In |
| * order to make sure things are cleaned up, it also creates a shutdown hook |
| * so that all filesystems can be closed when the process is terminated. This |
| * conveniently runs concurrently with our own shutdown handler, and |
| * therefore causes all the filesystems to be closed before the server can do |
| * all its necessary cleanup. |
| * |
| * The crazy dirty reflection in this method sneaks into the FileSystem cache |
| * and grabs the shutdown hook, removes it from the list of active shutdown |
| * hooks, and hangs onto it until later. Then, after we're properly done with |
| * our graceful shutdown, we can execute the hdfs hook manually to make sure |
| * loose ends are tied up. |
| * |
| * This seems quite fragile and susceptible to breaking if Hadoop changes |
| * anything about the way this cleanup is managed. Keep an eye on things. |
| */ |
| private Thread suppressHdfsShutdownHook() { |
| try { |
| Field field = FileSystem.class.getDeclaredField ("clientFinalizer"); |
| field.setAccessible(true); |
| Thread hdfsClientFinalizer = (Thread)field.get(null); |
| if (hdfsClientFinalizer == null) { |
| throw new RuntimeException("client finalizer is null, can't suppress!"); |
| } |
| Runtime.getRuntime().removeShutdownHook(hdfsClientFinalizer); |
| return hdfsClientFinalizer; |
| |
| } catch (NoSuchFieldException nsfe) { |
| LOG.fatal("Couldn't find field 'clientFinalizer' in FileSystem!", nsfe); |
| throw new RuntimeException("Failed to suppress HDFS shutdown hook"); |
| } catch (IllegalAccessException iae) { |
| LOG.fatal("Couldn't access field 'clientFinalizer' in FileSystem!", iae); |
| throw new RuntimeException("Failed to suppress HDFS shutdown hook"); |
| } |
| } |
| |
| /** |
| * Report the status of the server. A server is online once all the startup |
| * is completed (setting up filesystem, starting service threads, etc.). This |
| * method is designed mostly to be useful in tests. |
| * @return true if online, false if not. |
| */ |
| public boolean isOnline() { |
| return isOnline; |
| } |
| |
| private HLog setupHLog() throws RegionServerRunningException, |
| IOException { |
| |
| Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(serverInfo)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Log dir " + logdir); |
| } |
| if (fs.exists(logdir)) { |
| throw new RegionServerRunningException("region server already " + |
| "running at " + this.serverInfo.getServerAddress().toString() + |
| " because logdir " + logdir.toString() + " exists"); |
| } |
| HLog newlog = new HLog(fs, logdir, conf, logRoller); |
| return newlog; |
| } |
| |
| /* |
| * @param interval Interval since last time metrics were called. |
| */ |
| protected void doMetrics() { |
| this.metrics.regions.set(this.onlineRegions.size()); |
| this.metrics.incrementRequests(this.requestCount.get()); |
| // Is this too expensive every three seconds getting a lock on onlineRegions |
| // and then per store carried? Can I make metrics be sloppier and avoid |
| // the synchronizations? |
| int stores = 0; |
| int storefiles = 0; |
| long memcacheSize = 0; |
| long storefileIndexSize = 0; |
| synchronized (this.onlineRegions) { |
| for (Map.Entry<Integer, HRegion> e: this.onlineRegions.entrySet()) { |
| HRegion r = e.getValue(); |
| memcacheSize += r.memcacheSize.get(); |
| synchronized (r.stores) { |
| stores += r.stores.size(); |
| for(Map.Entry<Integer, HStore> ee: r.stores.entrySet()) { |
| HStore store = ee.getValue(); |
| storefiles += store.getStorefilesCount(); |
| try { |
| storefileIndexSize += store.getStorefilesIndexSize(); |
| } catch (IOException ex) { |
| LOG.warn("error getting store file index size for " + store + |
| ": " + StringUtils.stringifyException(ex)); |
| } |
| } |
| } |
| } |
| } |
| this.metrics.stores.set(stores); |
| this.metrics.storefiles.set(storefiles); |
| this.metrics.memcacheSizeMB.set((int)(memcacheSize/(1024*1024))); |
| this.metrics.storefileIndexSizeMB.set((int)(storefileIndexSize/(1024*1024))); |
| } |
| |
| /** |
| * @return Region server metrics instance. |
| */ |
| public RegionServerMetrics getMetrics() { |
| return this.metrics; |
| } |
| |
| /* |
| * Start maintanence Threads, Server, Worker and lease checker threads. |
| * Install an UncaughtExceptionHandler that calls abort of RegionServer if we |
| * get an unhandled exception. We cannot set the handler on all threads. |
| * Server's internal Listener thread is off limits. For Server, if an OOME, |
| * it waits a while then retries. Meantime, a flush or a compaction that |
| * tries to run should trigger same critical condition and the shutdown will |
| * run. On its way out, this server will shut down Server. Leases are sort |
| * of inbetween. It has an internal thread that while it inherits from |
| * Chore, it keeps its own internal stop mechanism so needs to be stopped |
| * by this hosting server. Worker logs the exception and exits. |
| */ |
| private void startServiceThreads() throws IOException { |
| String n = Thread.currentThread().getName(); |
| UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { |
| public void uncaughtException(Thread t, Throwable e) { |
| abort(); |
| LOG.fatal("Set stop flag in " + t.getName(), e); |
| } |
| }; |
| Threads.setDaemonThreadRunning(this.logRoller, n + ".logRoller", |
| handler); |
| Threads.setDaemonThreadRunning(this.logFlusher, n + ".logFlusher", |
| handler); |
| Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", |
| handler); |
| Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", |
| handler); |
| Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler); |
| Threads.setDaemonThreadRunning(this.majorCompactionChecker, |
| n + ".majorCompactionChecker", handler); |
| |
| // Leases is not a Thread. Internally it runs a daemon thread. If it gets |
| // an unhandled exception, it will just exit. |
| this.leases.setName(n + ".leaseChecker"); |
| this.leases.start(); |
| // Put up info server. |
| int port = this.conf.getInt("hbase.regionserver.info.port", 60030); |
| if (port >= 0) { |
| String a = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); |
| this.infoServer = new InfoServer("regionserver", a, port, false); |
| this.infoServer.setAttribute("regionserver", this); |
| this.infoServer.start(); |
| } |
| |
| // Set up the safe mode handler if safe mode has been configured. |
| if (conf.getInt("hbase.regionserver.safemode.period", 0) < 1) { |
| safeMode.set(false); |
| compactSplitThread.setLimit(-1); |
| LOG.debug("skipping safe mode"); |
| } else { |
| this.safeModeThread = new SafeModeThread(); |
| Threads.setDaemonThreadRunning(this.safeModeThread, n + ".safeMode", |
| handler); |
| } |
| |
| // Start Server. This service is like leases in that it internally runs |
| // a thread. |
| this.server.start(); |
| LOG.info("HRegionServer started at: " + |
| serverInfo.getServerAddress().toString()); |
| } |
| |
| /* |
| * Verify that server is healthy |
| */ |
| private boolean isHealthy() { |
| if (!fsOk) { |
| // File system problem |
| return false; |
| } |
| // Verify that all threads are alive |
| if (!(leases.isAlive() && compactSplitThread.isAlive() && |
| cacheFlusher.isAlive() && logRoller.isAlive() && |
| workerThread.isAlive() && this.majorCompactionChecker.isAlive())) { |
| // One or more threads are no longer alive - shut down |
| stop(); |
| return false; |
| } |
| return true; |
| } |
| /* |
| * Run some housekeeping tasks before we go into 'hibernation' sleeping at |
| * the end of the main HRegionServer run loop. |
| */ |
| private void housekeeping() { |
| // If the todo list has > 0 messages, iterate looking for open region |
| // messages. Send the master a message that we're working on its |
| // processing so it doesn't assign the region elsewhere. |
| if (this.toDo.size() <= 0) { |
| return; |
| } |
| // This iterator is 'safe'. We are guaranteed a view on state of the |
| // queue at time iterator was taken out. Apparently goes from oldest. |
| for (ToDoEntry e: this.toDo) { |
| if (e.msg.isType(HMsg.Type.MSG_REGION_OPEN)) { |
| addProcessingMessage(e.msg.getRegionInfo()); |
| } |
| } |
| } |
| |
| /** @return the HLog */ |
| HLog getLog() { |
| return this.log; |
| } |
| |
| /** |
| * Sets a flag that will cause all the HRegionServer threads to shut down |
| * in an orderly fashion. Used by unit tests. |
| */ |
| public void stop() { |
| this.stopRequested.set(true); |
| synchronized(this) { |
| notifyAll(); // Wakes run() if it is sleeping |
| } |
| } |
| |
| /** |
| * Cause the server to exit without closing the regions it is serving, the |
| * log it is using and without notifying the master. |
| * Used unit testing and on catastrophic events such as HDFS is yanked out |
| * from under hbase or we OOME. |
| */ |
| public void abort() { |
| this.abortRequested = true; |
| this.reservedSpace.clear(); |
| LOG.info("Dump of metrics: " + this.metrics.toString()); |
| stop(); |
| } |
| |
| /** |
| * Wait on all threads to finish. |
| * Presumption is that all closes and stops have already been called. |
| */ |
| void join() { |
| Threads.shutdown(this.majorCompactionChecker); |
| Threads.shutdown(this.workerThread); |
| Threads.shutdown(this.cacheFlusher); |
| Threads.shutdown(this.compactSplitThread); |
| Threads.shutdown(this.logRoller); |
| } |
| |
| /* |
| * Let the master know we're here |
| * Run initialization using parameters passed us by the master. |
| */ |
| private MapWritable reportForDuty(final Sleeper sleeper) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Telling master at " + |
| conf.get(MASTER_ADDRESS) + " that we are up"); |
| } |
| HMasterRegionInterface master = null; |
| while (!stopRequested.get() && master == null) { |
| try { |
| // Do initial RPC setup. The final argument indicates that the RPC |
| // should retry indefinitely. |
| master = (HMasterRegionInterface)HBaseRPC.waitForProxy( |
| HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID, |
| new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(), |
| this.conf, -1); |
| } catch (IOException e) { |
| LOG.warn("Unable to connect to master. Retrying. Error was:", e); |
| sleeper.sleep(); |
| } |
| } |
| this.hbaseMaster = master; |
| MapWritable result = null; |
| long lastMsg = 0; |
| while(!stopRequested.get()) { |
| try { |
| this.requestCount.set(0); |
| MemoryUsage memory = |
| ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); |
| HServerLoad hsl = new HServerLoad(0, (int)memory.getUsed()/1024/1024, |
| (int)memory.getMax()/1024/1024); |
| this.serverInfo.setLoad(hsl); |
| if (LOG.isDebugEnabled()) |
| LOG.debug("sending initial server load: " + hsl); |
| lastMsg = System.currentTimeMillis(); |
| result = this.hbaseMaster.regionServerStartup(serverInfo); |
| break; |
| } catch (Leases.LeaseStillHeldException e) { |
| LOG.info("Lease " + e.getName() + " already held on master. Check " + |
| "DNS configuration so that all region servers are" + |
| "reporting their true IPs and not 127.0.0.1. Otherwise, this" + |
| "problem should resolve itself after the lease period of " + |
| this.conf.get("hbase.master.lease.period") |
| + " seconds expires over on the master"); |
| } catch (IOException e) { |
| LOG.warn("error telling master we are up", e); |
| } |
| sleeper.sleep(lastMsg); |
| } |
| return result; |
| } |
| |
| /* Add to the outbound message buffer */ |
| private void reportOpen(HRegionInfo region) { |
| outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, region)); |
| } |
| |
| /* Add to the outbound message buffer */ |
| private void reportClose(HRegionInfo region) { |
| reportClose(region, null); |
| } |
| |
| /* Add to the outbound message buffer */ |
| private void reportClose(final HRegionInfo region, final byte[] message) { |
| outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region, message)); |
| } |
| |
| /** |
| * Add to the outbound message buffer |
| * |
| * When a region splits, we need to tell the master that there are two new |
| * regions that need to be assigned. |
| * |
| * We do not need to inform the master about the old region, because we've |
| * updated the meta or root regions, and the master will pick that up on its |
| * next rescan of the root or meta tables. |
| */ |
| void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA, |
| HRegionInfo newRegionB) { |
| outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT, oldRegion, |
| (oldRegion.getRegionNameAsString() + " split; daughters: " + |
| newRegionA.getRegionNameAsString() + ", " + |
| newRegionB.getRegionNameAsString()).getBytes())); |
| outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionA)); |
| outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_OPEN, newRegionB)); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // HMaster-given operations |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /* |
| * Data structure to hold a HMsg and retries count. |
| */ |
| private static class ToDoEntry { |
| private int tries; |
| private final HMsg msg; |
| ToDoEntry(HMsg msg) { |
| this.tries = 0; |
| this.msg = msg; |
| } |
| } |
| |
| final BlockingQueue<ToDoEntry> toDo = new LinkedBlockingQueue<ToDoEntry>(); |
| private Worker worker; |
| private Thread workerThread; |
| |
| /** Thread that performs long running requests from the master */ |
| class Worker implements Runnable { |
| void stop() { |
| synchronized(toDo) { |
| toDo.notifyAll(); |
| } |
| } |
| |
| public void run() { |
| try { |
| while(!stopRequested.get()) { |
| ToDoEntry e = null; |
| try { |
| e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); |
| if(e == null || stopRequested.get()) { |
| continue; |
| } |
| LOG.info("Worker: " + e.msg); |
| HRegion region = null; |
| HRegionInfo info = e.msg.getRegionInfo(); |
| switch(e.msg.getType()) { |
| |
| case MSG_REGIONSERVER_QUIESCE: |
| closeUserRegions(); |
| break; |
| |
| case MSG_REGION_OPEN: |
| // Open a region |
| if (!haveRootRegion.get() && !info.isRootRegion()) { |
| // root region is not online yet. requeue this task |
| LOG.info("putting region open request back into queue because" + |
| " root region is not yet available"); |
| try { |
| toDo.put(e); |
| } catch (InterruptedException ex) { |
| LOG.warn("insertion into toDo queue was interrupted", ex); |
| break; |
| } |
| } |
| openRegion(info); |
| break; |
| |
| case MSG_REGION_CLOSE: |
| // Close a region |
| closeRegion(e.msg.getRegionInfo(), true); |
| break; |
| |
| case MSG_REGION_CLOSE_WITHOUT_REPORT: |
| // Close a region, don't reply |
| closeRegion(e.msg.getRegionInfo(), false); |
| break; |
| |
| case MSG_REGION_SPLIT: |
| region = getRegion(info.getRegionName()); |
| region.flushcache(); |
| region.regionInfo.shouldSplit(true); |
| // force a compaction; split will be side-effect. |
| compactSplitThread.compactionRequested(region, |
| e.msg.getType().name()); |
| break; |
| |
| case MSG_REGION_MAJOR_COMPACT: |
| case MSG_REGION_COMPACT: |
| // Compact a region |
| region = getRegion(info.getRegionName()); |
| compactSplitThread.compactionRequested(region, |
| e.msg.isType(Type.MSG_REGION_MAJOR_COMPACT), |
| e.msg.getType().name()); |
| break; |
| |
| case MSG_REGION_FLUSH: |
| region = getRegion(info.getRegionName()); |
| region.flushcache(); |
| break; |
| |
| default: |
| throw new AssertionError( |
| "Impossible state during msg processing. Instruction: " |
| + e.msg.toString()); |
| } |
| } catch (InterruptedException ex) { |
| // continue |
| } catch (Exception ex) { |
| if (ex instanceof IOException) { |
| ex = RemoteExceptionHandler.checkIOException((IOException) ex); |
| } |
| if(e != null && e.tries < numRetries) { |
| LOG.warn(ex); |
| e.tries++; |
| try { |
| toDo.put(e); |
| } catch (InterruptedException ie) { |
| throw new RuntimeException("Putting into msgQueue was " + |
| "interrupted.", ex); |
| } |
| } else { |
| LOG.error("unable to process message" + |
| (e != null ? (": " + e.msg.toString()) : ""), ex); |
| if (!checkFileSystem()) { |
| break; |
| } |
| } |
| } |
| } |
| } catch(Throwable t) { |
| if (!checkOOME(t)) { |
| LOG.fatal("Unhandled exception", t); |
| } |
| } finally { |
| LOG.info("worker thread exiting"); |
| } |
| } |
| } |
| |
| void openRegion(final HRegionInfo regionInfo) { |
| // If historian is not online and this is not a meta region, online it. |
| if (!regionInfo.isMetaRegion() && |
| !RegionHistorian.getInstance().isOnline()) { |
| RegionHistorian.getInstance().online(this.conf); |
| } |
| Integer mapKey = Bytes.mapKey(regionInfo.getRegionName()); |
| HRegion region = this.onlineRegions.get(mapKey); |
| if (region == null) { |
| try { |
| region = instantiateRegion(regionInfo); |
| // Startup a compaction early if one is needed. |
| this.compactSplitThread. |
| compactionRequested(region, "Region open check"); |
| } catch (Throwable e) { |
| Throwable t = cleanup(e, |
| "Error opening " + regionInfo.getRegionNameAsString()); |
| // TODO: add an extra field in HRegionInfo to indicate that there is |
| // an error. We can't do that now because that would be an incompatible |
| // change that would require a migration |
| reportClose(regionInfo, StringUtils.stringifyException(t).getBytes()); |
| return; |
| } |
| this.lock.writeLock().lock(); |
| try { |
| this.log.setSequenceNumber(region.getMinSequenceId()); |
| this.onlineRegions.put(mapKey, region); |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| } |
| reportOpen(regionInfo); |
| } |
| |
| protected HRegion instantiateRegion(final HRegionInfo regionInfo) |
| throws IOException { |
| HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo |
| .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, |
| this.cacheFlusher); |
| r.initialize(null, new Progressable() { |
| public void progress() { |
| addProcessingMessage(regionInfo); |
| } |
| }); |
| return r; |
| } |
| |
| /* |
| * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue. |
| * This method is called while region is in the queue of regions to process |
| * and then while the region is being opened, it is called from the Worker |
| * thread that is running the region open. |
| * @param hri Region to add the message for |
| */ |
| protected void addProcessingMessage(final HRegionInfo hri) { |
| getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri)); |
| } |
| |
| void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted) |
| throws IOException { |
| HRegion region = this.removeFromOnlineRegions(hri); |
| if (region != null) { |
| region.close(); |
| if(reportWhenCompleted) { |
| reportClose(hri); |
| } |
| } |
| } |
| |
| /** Called either when the master tells us to restart or from stop() */ |
| ArrayList<HRegion> closeAllRegions() { |
| ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>(); |
| this.lock.writeLock().lock(); |
| try { |
| regionsToClose.addAll(onlineRegions.values()); |
| onlineRegions.clear(); |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| for(HRegion region: regionsToClose) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("closing region " + Bytes.toString(region.getRegionName())); |
| } |
| try { |
| region.close(abortRequested); |
| } catch (Throwable e) { |
| cleanup(e, "Error closing " + Bytes.toString(region.getRegionName())); |
| } |
| } |
| return regionsToClose; |
| } |
| |
| /* |
| * Thread to run close of a region. |
| */ |
| private static class RegionCloserThread extends Thread { |
| private final HRegion r; |
| |
| protected RegionCloserThread(final HRegion r) { |
| super(Thread.currentThread().getName() + ".regionCloser." + r.toString()); |
| this.r = r; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Closing region " + r.toString()); |
| } |
| r.close(); |
| } catch (Throwable e) { |
| LOG.error("Error closing region " + r.toString(), |
| RemoteExceptionHandler.checkThrowable(e)); |
| } |
| } |
| } |
| |
| /** Called as the first stage of cluster shutdown. */ |
| void closeUserRegions() { |
| ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>(); |
| this.lock.writeLock().lock(); |
| try { |
| synchronized (onlineRegions) { |
| for (Iterator<Map.Entry<Integer, HRegion>> i = |
| onlineRegions.entrySet().iterator(); i.hasNext();) { |
| Map.Entry<Integer, HRegion> e = i.next(); |
| HRegion r = e.getValue(); |
| if (!r.getRegionInfo().isMetaRegion()) { |
| regionsToClose.add(r); |
| i.remove(); |
| } |
| } |
| } |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| // Run region closes in parallel. |
| Set<Thread> threads = new HashSet<Thread>(); |
| try { |
| for (final HRegion r : regionsToClose) { |
| RegionCloserThread t = new RegionCloserThread(r); |
| t.start(); |
| threads.add(t); |
| } |
| } finally { |
| for (Thread t : threads) { |
| while (t.isAlive()) { |
| try { |
| t.join(); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| } |
| this.quiesced.set(true); |
| if (onlineRegions.size() == 0) { |
| outboundMsgs.add(HMsg.REPORT_EXITING); |
| } else { |
| outboundMsgs.add(HMsg.REPORT_QUIESCED); |
| } |
| } |
| |
| // |
| // HRegionInterface |
| // |
| |
| public HRegionInfo getRegionInfo(final byte [] regionName) |
| throws NotServingRegionException { |
| requestCount.incrementAndGet(); |
| return getRegion(regionName).getRegionInfo(); |
| } |
| |
| public Cell[] get(final byte [] regionName, final byte [] row, |
| final byte [] column, final long timestamp, final int numVersions) |
| throws IOException { |
| checkOpen(); |
| requestCount.incrementAndGet(); |
| try { |
| return getRegion(regionName).get(row, column, timestamp, numVersions); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| public RowResult getRow(final byte [] regionName, final byte [] row, |
| final byte [][] columns, final long ts, |
| final int numVersions, final long lockId) |
| throws IOException { |
| checkOpen(); |
| requestCount.incrementAndGet(); |
| try { |
| // convert the columns array into a set so it's easy to check later. |
| Set<byte []> columnSet = null; |
| if (columns != null) { |
| columnSet = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR); |
| columnSet.addAll(Arrays.asList(columns)); |
| } |
| |
| HRegion region = getRegion(regionName); |
| HbaseMapWritable<byte [], Cell> result = |
| region.getFull(row, columnSet, |
| ts, numVersions, getLockFromId(lockId)); |
| if (result == null || result.isEmpty()) |
| return null; |
| return new RowResult(row, result); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| public RowResult getClosestRowBefore(final byte [] regionName, |
| final byte [] row, final byte [] columnFamily) |
| throws IOException { |
| checkOpen(); |
| requestCount.incrementAndGet(); |
| try { |
| // locate the region we're operating on |
| HRegion region = getRegion(regionName); |
| // ask the region for all the data |
| RowResult rr = region.getClosestRowBefore(row, columnFamily); |
| return rr; |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| public RowResult next(final long scannerId) throws IOException { |
| RowResult[] rrs = next(scannerId, 1); |
| return rrs.length == 0 ? null : rrs[0]; |
| } |
| |
| public RowResult[] next(final long scannerId, int nbRows) throws IOException { |
| checkOpen(); |
| requestCount.incrementAndGet(); |
| ArrayList<RowResult> resultSets = new ArrayList<RowResult>(); |
| try { |
| String scannerName = String.valueOf(scannerId); |
| InternalScanner s = scanners.get(scannerName); |
| if (s == null) { |
| throw new UnknownScannerException("Name: " + scannerName); |
| } |
| this.leases.renewLease(scannerName); |
| for(int i = 0; i < nbRows; i++) { |
| // Collect values to be returned here |
| HbaseMapWritable<byte [], Cell> values |
| = new HbaseMapWritable<byte [], Cell>(); |
| HStoreKey key = new HStoreKey(); |
| while (s.next(key, values)) { |
| if (values.size() > 0) { |
| // Row has something in it. Return the value. |
| resultSets.add(new RowResult(key.getRow(), values)); |
| break; |
| } |
| } |
| } |
| return resultSets.toArray(new RowResult[resultSets.size()]); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| public void batchUpdate(final byte [] regionName, BatchUpdate b, |
| @SuppressWarnings("unused") long lockId) |
| throws IOException { |
| if (b.getRow() == null) |
| throw new IllegalArgumentException("update has null row"); |
| |
| checkOpen(); |
| this.requestCount.incrementAndGet(); |
| HRegion region = getRegion(regionName); |
| validateValuesLength(b, region); |
| try { |
| cacheFlusher.reclaimMemcacheMemory(); |
| region.batchUpdate(b, getLockFromId(b.getRowLock())); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| public int batchUpdates(final byte[] regionName, final BatchUpdate[] b) |
| throws IOException { |
| int i = 0; |
| checkOpen(); |
| try { |
| HRegion region = getRegion(regionName); |
| this.cacheFlusher.reclaimMemcacheMemory(); |
| Integer[] locks = new Integer[b.length]; |
| for (i = 0; i < b.length; i++) { |
| this.requestCount.incrementAndGet(); |
| validateValuesLength(b[i], region); |
| locks[i] = getLockFromId(b[i].getRowLock()); |
| region.batchUpdate(b[i], locks[i]); |
| } |
| } catch(WrongRegionException ex) { |
| return i; |
| } catch (NotServingRegionException ex) { |
| return i; |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| return -1; |
| } |
| |
| public boolean checkAndSave(final byte [] regionName, final BatchUpdate b, |
| final HbaseMapWritable<byte[],byte[]> expectedValues) |
| throws IOException { |
| if (b.getRow() == null) |
| throw new IllegalArgumentException("update has null row"); |
| checkOpen(); |
| this.requestCount.incrementAndGet(); |
| HRegion region = getRegion(regionName); |
| validateValuesLength(b, region); |
| try { |
| cacheFlusher.reclaimMemcacheMemory(); |
| boolean result = region.checkAndSave(b, |
| expectedValues,getLockFromId(b.getRowLock()), false); |
| return result; |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| |
| /** |
| * Utility method to verify values length |
| * @param batchUpdate The update to verify |
| * @throws IOException Thrown if a value is too long |
| */ |
| private void validateValuesLength(BatchUpdate batchUpdate, |
| HRegion region) throws IOException { |
| HTableDescriptor desc = region.getTableDesc(); |
| for (Iterator<BatchOperation> iter = |
| batchUpdate.iterator(); iter.hasNext();) { |
| BatchOperation operation = iter.next(); |
| if (operation.getValue() != null) { |
| HColumnDescriptor fam = |
| desc.getFamily(HStoreKey.getFamily(operation.getColumn())); |
| if (fam != null) { |
| int maxLength = fam.getMaxValueLength(); |
| if (operation.getValue().length > maxLength) { |
| throw new ValueOverMaxLengthException("Value in column " |
| + Bytes.toString(operation.getColumn()) + " is too long. " |
| + operation.getValue().length + " instead of " + maxLength); |
| } |
| } |
| } |
| } |
| } |
| |
| // |
| // remote scanner interface |
| // |
| |
| public long openScanner(byte [] regionName, byte [][] cols, byte [] firstRow, |
| final long timestamp, final RowFilterInterface filter) |
| throws IOException { |
| checkOpen(); |
| NullPointerException npe = null; |
| if (regionName == null) { |
| npe = new NullPointerException("regionName is null"); |
| } else if (cols == null) { |
| npe = new NullPointerException("columns to scan is null"); |
| } else if (firstRow == null) { |
| npe = new NullPointerException("firstRow for scanner is null"); |
| } |
| if (npe != null) { |
| throw new IOException("Invalid arguments to openScanner", npe); |
| } |
| requestCount.incrementAndGet(); |
| try { |
| HRegion r = getRegion(regionName); |
| InternalScanner s = |
| r.getScanner(cols, firstRow, timestamp, filter); |
| long scannerId = addScanner(s); |
| return scannerId; |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t, "Failed openScanner")); |
| } |
| } |
| |
| protected long addScanner(InternalScanner s) throws LeaseStillHeldException { |
| long scannerId = -1L; |
| scannerId = rand.nextLong(); |
| String scannerName = String.valueOf(scannerId); |
| synchronized(scanners) { |
| scanners.put(scannerName, s); |
| } |
| this.leases. |
| createLease(scannerName, new ScannerListener(scannerName)); |
| return scannerId; |
| } |
| |
| public void close(final long scannerId) throws IOException { |
| try { |
| checkOpen(); |
| requestCount.incrementAndGet(); |
| String scannerName = String.valueOf(scannerId); |
| InternalScanner s = null; |
| synchronized(scanners) { |
| s = scanners.remove(scannerName); |
| } |
| if(s == null) { |
| throw new UnknownScannerException(scannerName); |
| } |
| s.close(); |
| this.leases.cancelLease(scannerName); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| /** |
| * Instantiated as a scanner lease. |
| * If the lease times out, the scanner is closed |
| */ |
| private class ScannerListener implements LeaseListener { |
| private final String scannerName; |
| |
| ScannerListener(final String n) { |
| this.scannerName = n; |
| } |
| |
| public void leaseExpired() { |
| LOG.info("Scanner " + this.scannerName + " lease expired"); |
| InternalScanner s = null; |
| synchronized(scanners) { |
| s = scanners.remove(this.scannerName); |
| } |
| if (s != null) { |
| try { |
| s.close(); |
| } catch (IOException e) { |
| LOG.error("Closing scanner", e); |
| } |
| } |
| } |
| } |
| |
| // |
| // Methods that do the actual work for the remote API |
| // |
| |
| public void deleteAll(final byte [] regionName, final byte [] row, |
| final byte [] column, final long timestamp, final long lockId) |
| throws IOException { |
| HRegion region = getRegion(regionName); |
| region.deleteAll(row, column, timestamp, getLockFromId(lockId)); |
| } |
| |
| public void deleteAll(final byte [] regionName, final byte [] row, |
| final long timestamp, final long lockId) |
| throws IOException { |
| HRegion region = getRegion(regionName); |
| region.deleteAll(row, timestamp, getLockFromId(lockId)); |
| } |
| |
| public void deleteAllByRegex(byte[] regionName, byte[] row, String colRegex, |
| long timestamp, long lockId) throws IOException { |
| getRegion(regionName).deleteAllByRegex(row, colRegex, timestamp, |
| getLockFromId(lockId)); |
| } |
| |
| public void deleteFamily(byte [] regionName, byte [] row, byte [] family, |
| long timestamp, final long lockId) |
| throws IOException{ |
| getRegion(regionName).deleteFamily(row, family, timestamp, |
| getLockFromId(lockId)); |
| } |
| |
| public void deleteFamilyByRegex(byte[] regionName, byte[] row, String familyRegex, |
| long timestamp, long lockId) throws IOException { |
| getRegion(regionName).deleteFamilyByRegex(row, familyRegex, timestamp, |
| getLockFromId(lockId)); |
| } |
| |
| public boolean exists(byte[] regionName, byte[] row, byte[] column, |
| long timestamp, long lockId) |
| throws IOException { |
| return getRegion(regionName).exists(row, column, timestamp, |
| getLockFromId(lockId)); |
| } |
| |
| public long lockRow(byte [] regionName, byte [] row) |
| throws IOException { |
| checkOpen(); |
| NullPointerException npe = null; |
| if(regionName == null) { |
| npe = new NullPointerException("regionName is null"); |
| } else if(row == null) { |
| npe = new NullPointerException("row to lock is null"); |
| } |
| if(npe != null) { |
| IOException io = new IOException("Invalid arguments to lockRow"); |
| io.initCause(npe); |
| throw io; |
| } |
| requestCount.incrementAndGet(); |
| try { |
| HRegion region = getRegion(regionName); |
| Integer r = region.obtainRowLock(row); |
| long lockId = addRowLock(r,region); |
| LOG.debug("Row lock " + lockId + " explicitly acquired by client"); |
| return lockId; |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t, |
| "Error obtaining row lock (fsOk: " + this.fsOk + ")")); |
| } |
| } |
| |
| protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException { |
| long lockId = -1L; |
| lockId = rand.nextLong(); |
| String lockName = String.valueOf(lockId); |
| synchronized(rowlocks) { |
| rowlocks.put(lockName, r); |
| } |
| this.leases. |
| createLease(lockName, new RowLockListener(lockName, region)); |
| return lockId; |
| } |
| |
| /** |
| * Method to get the Integer lock identifier used internally |
| * from the long lock identifier used by the client. |
| * @param lockId long row lock identifier from client |
| * @return intId Integer row lock used internally in HRegion |
| * @throws IOException Thrown if this is not a valid client lock id. |
| */ |
| private Integer getLockFromId(long lockId) |
| throws IOException { |
| if(lockId == -1L) { |
| return null; |
| } |
| String lockName = String.valueOf(lockId); |
| Integer rl = null; |
| synchronized(rowlocks) { |
| rl = rowlocks.get(lockName); |
| } |
| if(rl == null) { |
| throw new IOException("Invalid row lock"); |
| } |
| this.leases.renewLease(lockName); |
| return rl; |
| } |
| |
| public void unlockRow(byte [] regionName, long lockId) |
| throws IOException { |
| checkOpen(); |
| NullPointerException npe = null; |
| if(regionName == null) { |
| npe = new NullPointerException("regionName is null"); |
| } else if(lockId == -1L) { |
| npe = new NullPointerException("lockId is null"); |
| } |
| if(npe != null) { |
| IOException io = new IOException("Invalid arguments to unlockRow"); |
| io.initCause(npe); |
| throw io; |
| } |
| requestCount.incrementAndGet(); |
| try { |
| HRegion region = getRegion(regionName); |
| String lockName = String.valueOf(lockId); |
| Integer r = null; |
| synchronized(rowlocks) { |
| r = rowlocks.remove(lockName); |
| } |
| if(r == null) { |
| throw new UnknownRowLockException(lockName); |
| } |
| region.releaseRowLock(r); |
| this.leases.cancelLease(lockName); |
| LOG.debug("Row lock " + lockId + " has been explicitly released by client"); |
| } catch (Throwable t) { |
| throw convertThrowableToIOE(cleanup(t)); |
| } |
| } |
| |
| Map<String, Integer> rowlocks = |
| new ConcurrentHashMap<String, Integer>(); |
| |
| /** |
| * Instantiated as a row lock lease. |
| * If the lease times out, the row lock is released |
| */ |
| private class RowLockListener implements LeaseListener { |
| private final String lockName; |
| private final HRegion region; |
| |
| RowLockListener(final String lockName, final HRegion region) { |
| this.lockName = lockName; |
| this.region = region; |
| } |
| |
| public void leaseExpired() { |
| LOG.info("Row Lock " + this.lockName + " lease expired"); |
| Integer r = null; |
| synchronized(rowlocks) { |
| r = rowlocks.remove(this.lockName); |
| } |
| if(r != null) { |
| region.releaseRowLock(r); |
| } |
| } |
| } |
| |
| /** |
| * @return Info on this server. |
| */ |
| public HServerInfo getServerInfo() { |
| return this.serverInfo; |
| } |
| |
| /** @return the info server */ |
| public InfoServer getInfoServer() { |
| return infoServer; |
| } |
| |
| /** |
| * @return true if a stop has been requested. |
| */ |
| public boolean isStopRequested() { |
| return stopRequested.get(); |
| } |
| |
| /** |
| * @return true if the region server is in safe mode |
| */ |
| public boolean isInSafeMode() { |
| return safeMode.get(); |
| } |
| |
| /** |
| * |
| * @return the configuration |
| */ |
| public HBaseConfiguration getConfiguration() { |
| return conf; |
| } |
| |
| /** @return the write lock for the server */ |
| ReentrantReadWriteLock.WriteLock getWriteLock() { |
| return lock.writeLock(); |
| } |
| |
| /** |
| * @return Immutable list of this servers regions. |
| */ |
| public Collection<HRegion> getOnlineRegions() { |
| return Collections.unmodifiableCollection(onlineRegions.values()); |
| } |
| |
| /** |
| * @return The HRegionInfos from online regions sorted |
| */ |
| public SortedSet<HRegionInfo> getSortedOnlineRegionInfos() { |
| SortedSet<HRegionInfo> result = new TreeSet<HRegionInfo>(); |
| synchronized(this.onlineRegions) { |
| for (HRegion r: this.onlineRegions.values()) { |
| result.add(r.getRegionInfo()); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * This method removes HRegion corresponding to hri from the Map of onlineRegions. |
| * |
| * @param hri the HRegionInfo corresponding to the HRegion to-be-removed. |
| * @return the removed HRegion, or null if the HRegion was not in onlineRegions. |
| */ |
| HRegion removeFromOnlineRegions(HRegionInfo hri) { |
| this.lock.writeLock().lock(); |
| HRegion toReturn = null; |
| try { |
| toReturn = onlineRegions.remove(Bytes.mapKey(hri.getRegionName())); |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| return toReturn; |
| } |
| |
| /** |
| * @return A new Map of online regions sorted by region size with the first |
| * entry being the biggest. |
| */ |
| public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() { |
| // we'll sort the regions in reverse |
| SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>( |
| new Comparator<Long>() { |
| public int compare(Long a, Long b) { |
| return -1 * a.compareTo(b); |
| } |
| }); |
| // Copy over all regions. Regions are sorted by size with biggest first. |
| synchronized (this.onlineRegions) { |
| for (HRegion region : this.onlineRegions.values()) { |
| sortedRegions.put(Long.valueOf(region.memcacheSize.get()), region); |
| } |
| } |
| return sortedRegions; |
| } |
| |
| /** |
| * @param regionName |
| * @return HRegion for the passed <code>regionName</code> or null if named |
| * region is not member of the online regions. |
| */ |
| public HRegion getOnlineRegion(final byte [] regionName) { |
| return onlineRegions.get(Bytes.mapKey(regionName)); |
| } |
| |
| /** @return the request count */ |
| public AtomicInteger getRequestCount() { |
| return this.requestCount; |
| } |
| |
| /** @return reference to FlushRequester */ |
| public FlushRequester getFlushRequester() { |
| return this.cacheFlusher; |
| } |
| |
| /** |
| * Protected utility method for safely obtaining an HRegion handle. |
| * @param regionName Name of online {@link HRegion} to return |
| * @return {@link HRegion} for <code>regionName</code> |
| * @throws NotServingRegionException |
| */ |
| protected HRegion getRegion(final byte [] regionName) |
| throws NotServingRegionException { |
| HRegion region = null; |
| this.lock.readLock().lock(); |
| try { |
| Integer key = Integer.valueOf(Bytes.hashCode(regionName)); |
| region = onlineRegions.get(key); |
| if (region == null) { |
| throw new NotServingRegionException(regionName); |
| } |
| return region; |
| } finally { |
| this.lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Get the top N most loaded regions this server is serving so we can |
| * tell the master which regions it can reallocate if we're overloaded. |
| * TODO: actually calculate which regions are most loaded. (Right now, we're |
| * just grabbing the first N regions being served regardless of load.) |
| */ |
| protected HRegionInfo[] getMostLoadedRegions() { |
| ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>(); |
| synchronized (onlineRegions) { |
| for (HRegion r : onlineRegions.values()) { |
| if (r.isClosed() || r.isClosing()) { |
| continue; |
| } |
| if (regions.size() < numRegionsToReport) { |
| regions.add(r.getRegionInfo()); |
| } else { |
| break; |
| } |
| } |
| } |
| return regions.toArray(new HRegionInfo[regions.size()]); |
| } |
| |
| /** |
| * Called to verify that this server is up and running. |
| * |
| * @throws IOException |
| */ |
| protected void checkOpen() throws IOException { |
| if (this.stopRequested.get() || this.abortRequested) { |
| throw new IOException("Server not running" + |
| (this.abortRequested? ", aborting": "")); |
| } |
| if (!fsOk) { |
| throw new IOException("File system not available"); |
| } |
| } |
| |
| /** |
| * @return Returns list of non-closed regions hosted on this server. If no |
| * regions to check, returns an empty list. |
| */ |
| protected Set<HRegion> getRegionsToCheck() { |
| HashSet<HRegion> regionsToCheck = new HashSet<HRegion>(); |
| //TODO: is this locking necessary? |
| lock.readLock().lock(); |
| try { |
| regionsToCheck.addAll(this.onlineRegions.values()); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| // Purge closed regions. |
| for (final Iterator<HRegion> i = regionsToCheck.iterator(); i.hasNext();) { |
| HRegion r = i.next(); |
| if (r.isClosed()) { |
| i.remove(); |
| } |
| } |
| return regionsToCheck; |
| } |
| |
| public long getProtocolVersion(final String protocol, |
| @SuppressWarnings("unused") final long clientVersion) |
| throws IOException { |
| if (protocol.equals(HRegionInterface.class.getName())) { |
| return HBaseRPCProtocolVersion.versionID; |
| } |
| throw new IOException("Unknown protocol to name node: " + protocol); |
| } |
| |
| /** |
| * @return Queue to which you can add outbound messages. |
| */ |
| protected List<HMsg> getOutboundMsgs() { |
| return this.outboundMsgs; |
| } |
| |
| /** |
| * Return the total size of all memcaches in every region. |
| * @return memcache size in bytes |
| */ |
| public long getGlobalMemcacheSize() { |
| long total = 0; |
| synchronized (onlineRegions) { |
| for (HRegion region : onlineRegions.values()) { |
| total += region.memcacheSize.get(); |
| } |
| } |
| return total; |
| } |
| |
| /** |
| * @return Return the leases. |
| */ |
| protected Leases getLeases() { |
| return leases; |
| } |
| |
| /** |
| * @return Return the rootDir. |
| */ |
| protected Path getRootDir() { |
| return rootDir; |
| } |
| |
| /** |
| * @return Return the fs. |
| */ |
| protected FileSystem getFileSystem() { |
| return fs; |
| } |
| |
| // |
| // Main program and support routines |
| // |
| |
| private static void printUsageAndExit() { |
| printUsageAndExit(null); |
| } |
| |
| private static void printUsageAndExit(final String message) { |
| if (message != null) { |
| System.err.println(message); |
| } |
| System.err.println("Usage: java " + |
| "org.apache.hbase.HRegionServer [--bind=hostname:port] start"); |
| System.exit(0); |
| } |
| |
| /** |
| * Do class main. |
| * @param args |
| * @param regionServerClass HRegionServer to instantiate. |
| */ |
| protected static void doMain(final String [] args, |
| final Class<? extends HRegionServer> regionServerClass) { |
| if (args.length < 1) { |
| printUsageAndExit(); |
| } |
| Configuration conf = new HBaseConfiguration(); |
| |
| // Process command-line args. TODO: Better cmd-line processing |
| // (but hopefully something not as painful as cli options). |
| final String addressArgKey = "--bind="; |
| for (String cmd: args) { |
| if (cmd.startsWith(addressArgKey)) { |
| conf.set(REGIONSERVER_ADDRESS, cmd.substring(addressArgKey.length())); |
| continue; |
| } |
| |
| if (cmd.equals("start")) { |
| try { |
| // If 'local', don't start a region server here. Defer to |
| // LocalHBaseCluster. It manages 'local' clusters. |
| if (LocalHBaseCluster.isLocal(conf)) { |
| LOG.warn("Not starting a distinct region server because " + |
| "hbase.master is set to 'local' mode"); |
| } else { |
| RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); |
| if (runtime != null) { |
| LOG.info("vmInputArguments=" + runtime.getInputArguments()); |
| } |
| Constructor<? extends HRegionServer> c = |
| regionServerClass.getConstructor(HBaseConfiguration.class); |
| HRegionServer hrs = c.newInstance(conf); |
| Thread t = new Thread(hrs); |
| t.setName("regionserver" + hrs.server.getListenerAddress()); |
| t.start(); |
| } |
| } catch (Throwable t) { |
| LOG.error( "Can not start region server because "+ |
| StringUtils.stringifyException(t) ); |
| System.exit(-1); |
| } |
| break; |
| } |
| |
| if (cmd.equals("stop")) { |
| printUsageAndExit("To shutdown the regionserver run " + |
| "bin/hbase-daemon.sh stop regionserver or send a kill signal to" + |
| "the regionserver pid"); |
| } |
| |
| // Print out usage if we get to here. |
| printUsageAndExit(); |
| } |
| } |
| |
| /** |
| * @param args |
| */ |
| public static void main(String [] args) { |
| Configuration conf = new HBaseConfiguration(); |
| @SuppressWarnings("unchecked") |
| Class<? extends HRegionServer> regionServerClass = (Class<? extends HRegionServer>) conf |
| .getClass(HConstants.REGION_SERVER_IMPL, HRegionServer.class); |
| doMain(args, regionServerClass); |
| } |
| } |