| /** |
| * Copyright 2007 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.client; |
| |
| import java.io.IOException; |
| import java.lang.reflect.UndeclaredThrowableException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.hbase.DoNotRetryIOException; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.HServerAddress; |
| import org.apache.hadoop.hbase.HStoreKey; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.LocalHBaseCluster; |
| import org.apache.hadoop.hbase.MasterNotRunningException; |
| import org.apache.hadoop.hbase.RemoteExceptionHandler; |
| import org.apache.hadoop.hbase.TableNotFoundException; |
| import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; |
| import org.apache.hadoop.hbase.io.BatchUpdate; |
| import org.apache.hadoop.hbase.io.Cell; |
| import org.apache.hadoop.hbase.io.RowResult; |
| import org.apache.hadoop.hbase.ipc.HBaseRPCProtocolVersion; |
| import org.apache.hadoop.hbase.ipc.HMasterInterface; |
| import org.apache.hadoop.hbase.ipc.HRegionInterface; |
| import org.apache.hadoop.hbase.ipc.HBaseRPC; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.MetaUtils; |
| import org.apache.hadoop.hbase.util.SoftValueSortedMap; |
| import org.apache.hadoop.hbase.util.Writables; |
| import org.apache.hadoop.ipc.RemoteException; |
| |
| /** |
| * A non-instantiable class that manages connections to multiple tables in |
| * multiple HBase instances. |
| * |
| * Used by {@link HTable} and {@link HBaseAdmin} |
| */ |
| public class HConnectionManager implements HConstants { |
| /* |
| * Not instantiable. |
| */ |
| protected HConnectionManager() { |
| super(); |
| } |
| |
| // A Map of master HServerAddress -> connection information for that instance |
| // Note that although the Map is synchronized, the objects it contains |
| // are mutable and hence require synchronized access to them |
| private static final Map<String, TableServers> HBASE_INSTANCES = |
| new ConcurrentHashMap<String, TableServers>(); |
| |
| /** |
| * Get the connection object for the instance specified by the configuration |
| * If no current connection exists, create a new connection for that instance |
| * @param conf |
| * @return HConnection object for the instance specified by the configuration |
| */ |
| public static HConnection getConnection(HBaseConfiguration conf) { |
| TableServers connection; |
| synchronized (HBASE_INSTANCES) { |
| String instanceName = conf.get(HBASE_DIR); |
| connection = HBASE_INSTANCES.get(instanceName); |
| if (connection == null) { |
| connection = new TableServers(conf); |
| HBASE_INSTANCES.put(instanceName, connection); |
| } |
| } |
| return connection; |
| } |
| |
| /** |
| * Delete connection information for the instance specified by configuration |
| * @param conf |
| * @param stopProxy |
| */ |
| public static void deleteConnectionInfo(HBaseConfiguration conf, |
| boolean stopProxy) { |
| synchronized (HBASE_INSTANCES) { |
| TableServers t = HBASE_INSTANCES.remove(conf.get(HBASE_DIR)); |
| if (t != null) { |
| t.close(stopProxy); |
| } |
| } |
| } |
| |
| /* Encapsulates finding the servers for an HBase instance */ |
| private static class TableServers implements ServerConnection, HConstants { |
| private static final Log LOG = LogFactory.getLog(TableServers.class); |
| private final Class<? extends HRegionInterface> serverInterfaceClass; |
| private final long pause; |
| private final int numRetries; |
| private final int maxRPCAttempts; |
| |
| private final Integer masterLock = new Integer(0); |
| private volatile boolean closed; |
| private volatile HMasterInterface master; |
| private volatile boolean masterChecked; |
| |
| private final Integer rootRegionLock = new Integer(0); |
| private final Integer metaRegionLock = new Integer(0); |
| private final Integer userRegionLock = new Integer(0); |
| |
| private volatile HBaseConfiguration conf; |
| |
| // Known region HServerAddress.toString() -> HRegionInterface |
| private final Map<String, HRegionInterface> servers = |
| new ConcurrentHashMap<String, HRegionInterface>(); |
| |
| private volatile HRegionLocation rootRegionLocation; |
| |
| private final Map<Integer, SoftValueSortedMap<byte [], HRegionLocation>> |
| cachedRegionLocations = |
| new HashMap<Integer, SoftValueSortedMap<byte [], HRegionLocation>>(); |
| |
| /** |
| * constructor |
| * @param conf Configuration object |
| */ |
| @SuppressWarnings("unchecked") |
| public TableServers(HBaseConfiguration conf) { |
| this.conf = LocalHBaseCluster.doLocal(new HBaseConfiguration(conf)); |
| |
| String serverClassName = |
| conf.get(REGION_SERVER_CLASS, DEFAULT_REGION_SERVER_CLASS); |
| |
| this.closed = false; |
| |
| try { |
| this.serverInterfaceClass = |
| (Class<? extends HRegionInterface>) Class.forName(serverClassName); |
| |
| } catch (ClassNotFoundException e) { |
| throw new UnsupportedOperationException( |
| "Unable to find region server interface " + serverClassName, e); |
| } |
| |
| this.pause = conf.getLong("hbase.client.pause", 2 * 1000); |
| this.numRetries = conf.getInt("hbase.client.retries.number", 10); |
| this.maxRPCAttempts = conf.getInt("hbase.client.rpc.maxattempts", 1); |
| |
| this.master = null; |
| this.masterChecked = false; |
| } |
| |
| private long getPauseTime(int tries) { |
| int ntries = tries; |
| if (ntries >= HConstants.RETRY_BACKOFF.length) |
| ntries = HConstants.RETRY_BACKOFF.length - 1; |
| return this.pause * HConstants.RETRY_BACKOFF[ntries]; |
| } |
| |
| public void unsetRootRegionLocation() { |
| this.rootRegionLocation = null; |
| } |
| |
| public void setRootRegionLocation(HRegionLocation rootRegion) { |
| if (rootRegion == null) { |
| throw new IllegalArgumentException( |
| "Cannot set root region location to null."); |
| } |
| this.rootRegionLocation = rootRegion; |
| } |
| |
| public HMasterInterface getMaster() throws MasterNotRunningException { |
| HServerAddress masterLocation = null; |
| synchronized (this.masterLock) { |
| for (int tries = 0; |
| !this.closed && |
| !this.masterChecked && this.master == null && |
| tries < numRetries; |
| tries++) { |
| |
| masterLocation = new HServerAddress(this.conf.get(MASTER_ADDRESS, |
| DEFAULT_MASTER_ADDRESS)); |
| try { |
| HMasterInterface tryMaster = (HMasterInterface)HBaseRPC.getProxy( |
| HMasterInterface.class, HBaseRPCProtocolVersion.versionID, |
| masterLocation.getInetSocketAddress(), this.conf); |
| |
| if (tryMaster.isMasterRunning()) { |
| this.master = tryMaster; |
| break; |
| } |
| |
| } catch (IOException e) { |
| if (tries == numRetries - 1) { |
| // This was our last chance - don't bother sleeping |
| break; |
| } |
| LOG.info("getMaster attempt " + tries + " of " + this.numRetries + |
| " failed; retrying after sleep of " + |
| getPauseTime(tries), e); |
| } |
| |
| // Cannot connect to master or it is not running. Sleep & retry |
| try { |
| Thread.sleep(getPauseTime(tries)); |
| } catch (InterruptedException e) { |
| // continue |
| } |
| } |
| this.masterChecked = true; |
| } |
| if (this.master == null) { |
| if (masterLocation == null) { |
| throw new MasterNotRunningException(); |
| } |
| throw new MasterNotRunningException(masterLocation.toString()); |
| } |
| return this.master; |
| } |
| |
| public boolean isMasterRunning() { |
| if (this.master == null) { |
| try { |
| getMaster(); |
| |
| } catch (MasterNotRunningException e) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| public boolean tableExists(final byte [] tableName) |
| throws MasterNotRunningException { |
| getMaster(); |
| if (tableName == null) { |
| throw new IllegalArgumentException("Table name cannot be null"); |
| } |
| if (isMetaTableName(tableName)) { |
| return true; |
| } |
| boolean exists = false; |
| try { |
| HTableDescriptor[] tables = listTables(); |
| for (int i = 0; i < tables.length; i++) { |
| if (Bytes.equals(tables[i].getName(), tableName)) { |
| exists = true; |
| } |
| } |
| } catch (IOException e) { |
| LOG.warn("Testing for table existence threw exception", e); |
| } |
| return exists; |
| } |
| |
| /* |
| * @param n |
| * @return Truen if passed tablename <code>n</code> is equal to the name |
| * of a catalog table. |
| */ |
| private static boolean isMetaTableName(final byte [] n) { |
| return MetaUtils.isMetaTableName(n); |
| } |
| |
| public HRegionLocation getRegionLocation(final byte [] name, |
| final byte [] row, boolean reload) |
| throws IOException { |
| getMaster(); |
| return reload? relocateRegion(name, row): locateRegion(name, row); |
| } |
| |
| public HTableDescriptor[] listTables() throws IOException { |
| getMaster(); |
| final TreeSet<HTableDescriptor> uniqueTables = |
| new TreeSet<HTableDescriptor>(); |
| |
| MetaScannerVisitor visitor = new MetaScannerVisitor() { |
| |
| public boolean processRow(RowResult rowResult) throws IOException { |
| HRegionInfo info = Writables.getHRegionInfo( |
| rowResult.get(COL_REGIONINFO)); |
| |
| // Only examine the rows where the startKey is zero length |
| if (info.getStartKey().length == 0) { |
| uniqueTables.add(info.getTableDesc()); |
| } |
| return true; |
| } |
| |
| }; |
| MetaScanner.metaScan(conf, visitor); |
| |
| return uniqueTables.toArray(new HTableDescriptor[uniqueTables.size()]); |
| } |
| |
| public boolean isTableEnabled(byte[] tableName) throws IOException { |
| if (!tableExists(tableName)) { |
| throw new TableNotFoundException(Bytes.toString(tableName)); |
| } |
| if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { |
| // The root region is always enabled |
| return true; |
| } |
| |
| boolean result = true; |
| int rowsScanned = 0; |
| byte[] startKey = |
| HRegionInfo.createRegionName(tableName, null, HConstants.ZEROES); |
| HRegionInfo currentRegion = null; |
| do { |
| if (currentRegion != null) { |
| byte[] endKey = currentRegion.getEndKey(); |
| if (endKey == null || |
| HStoreKey.equalsTwoRowKeys(currentRegion, endKey, |
| HConstants.EMPTY_BYTE_ARRAY)) { |
| // We have reached the end of the table and we're done |
| break; |
| } |
| } |
| HRegionInfo oldRegion = currentRegion; |
| if (oldRegion != null) { |
| startKey = oldRegion.getEndKey(); |
| } |
| ScannerCallable s = new ScannerCallable(this, |
| (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ? |
| HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), |
| HConstants.COL_REGIONINFO_ARRAY, startKey, |
| HConstants.LATEST_TIMESTAMP, null |
| ); |
| // Open scanner |
| getRegionServerWithRetries(s); |
| currentRegion = s.getHRegionInfo(); |
| try { |
| RowResult r = null; |
| RowResult[] rrs = null; |
| while (result && (rrs = getRegionServerWithRetries(s)) != null) { |
| r = rrs[0]; |
| Cell c = r.get(HConstants.COL_REGIONINFO); |
| if (c != null) { |
| byte[] value = c.getValue(); |
| if (value != null) { |
| HRegionInfo info = Writables.getHRegionInfoOrNull(value); |
| if (info != null) { |
| if (Bytes.equals(info.getTableDesc().getName(), tableName)) { |
| rowsScanned += 1; |
| result = !info.isOffline(); |
| } |
| } |
| } |
| } |
| } |
| } finally { |
| s.setClose(); |
| getRegionServerWithRetries(s); |
| } |
| } while (result); |
| return rowsScanned > 0 && result; |
| } |
| |
| private class HTableDescriptorFinder |
| implements MetaScanner.MetaScannerVisitor { |
| byte[] tableName; |
| HTableDescriptor result; |
| protected HTableDescriptorFinder(byte[] tableName) { |
| this.tableName = tableName; |
| } |
| public boolean processRow(RowResult rowResult) throws IOException { |
| HRegionInfo info = Writables.getHRegionInfo( |
| rowResult.get(HConstants.COL_REGIONINFO)); |
| HTableDescriptor desc = info.getTableDesc(); |
| if (Bytes.compareTo(desc.getName(), tableName) == 0) { |
| result = desc; |
| return false; |
| } |
| return true; |
| } |
| HTableDescriptor getResult() { |
| return result; |
| } |
| } |
| |
| public HTableDescriptor getHTableDescriptor(final byte[] tableName) |
| throws IOException { |
| if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) { |
| return new UnmodifyableHTableDescriptor(HTableDescriptor.ROOT_TABLEDESC); |
| } |
| if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) { |
| return new UnmodifyableHTableDescriptor(HTableDescriptor.META_TABLEDESC); |
| } |
| HTableDescriptorFinder finder = new HTableDescriptorFinder(tableName); |
| MetaScanner.metaScan(conf, finder); |
| HTableDescriptor result = finder.getResult(); |
| if (result == null) { |
| throw new TableNotFoundException(Bytes.toString(tableName)); |
| } |
| return result; |
| } |
| |
| public HRegionLocation locateRegion(final byte [] tableName, |
| final byte [] row) |
| throws IOException{ |
| getMaster(); |
| return locateRegion(tableName, row, true); |
| } |
| |
| public HRegionLocation relocateRegion(final byte [] tableName, |
| final byte [] row) |
| throws IOException{ |
| getMaster(); |
| return locateRegion(tableName, row, false); |
| } |
| |
| private HRegionLocation locateRegion(final byte [] tableName, |
| final byte [] row, boolean useCache) |
| throws IOException{ |
| if (tableName == null || tableName.length == 0) { |
| throw new IllegalArgumentException( |
| "table name cannot be null or zero length"); |
| } |
| |
| if (Bytes.equals(tableName, ROOT_TABLE_NAME)) { |
| synchronized (rootRegionLock) { |
| // This block guards against two threads trying to find the root |
| // region at the same time. One will go do the find while the |
| // second waits. The second thread will not do find. |
| |
| if (!useCache || rootRegionLocation == null) { |
| return locateRootRegion(); |
| } |
| return rootRegionLocation; |
| } |
| } else if (Bytes.equals(tableName, META_TABLE_NAME)) { |
| synchronized (metaRegionLock) { |
| // This block guards against two threads trying to load the meta |
| // region at the same time. The first will load the meta region and |
| // the second will use the value that the first one found. |
| return locateRegionInMeta(ROOT_TABLE_NAME, tableName, row, useCache); |
| } |
| } else { |
| synchronized(userRegionLock){ |
| return locateRegionInMeta(META_TABLE_NAME, tableName, row, useCache); |
| } |
| } |
| } |
| |
| /* |
| * Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation |
| * info that contains the table and row we're seeking. |
| */ |
| private HRegionLocation locateRegionInMeta(final byte [] parentTable, |
| final byte [] tableName, final byte [] row, boolean useCache) |
| throws IOException{ |
| HRegionLocation location = null; |
| // If supposed to be using the cache, then check it for a possible hit. |
| // Otherwise, delete any existing cached location so it won't interfere. |
| if (useCache) { |
| location = getCachedLocation(tableName, row); |
| if (location != null) { |
| return location; |
| } |
| } else { |
| deleteCachedLocation(tableName, row); |
| } |
| |
| // build the key of the meta region we should be looking for. |
| // the extra 9's on the end are necessary to allow "exact" matches |
| // without knowing the precise region names. |
| byte [] metaKey = HRegionInfo.createRegionName(tableName, row, |
| HConstants.NINES); |
| for (int tries = 0; true; tries++) { |
| if (tries >= numRetries) { |
| throw new NoServerForRegionException("Unable to find region for " |
| + Bytes.toString(row) + " after " + numRetries + " tries."); |
| } |
| |
| try { |
| // locate the root region |
| HRegionLocation metaLocation = locateRegion(parentTable, metaKey); |
| HRegionInterface server = |
| getHRegionConnection(metaLocation.getServerAddress()); |
| |
| // Query the root region for the location of the meta region |
| RowResult regionInfoRow = server.getClosestRowBefore( |
| metaLocation.getRegionInfo().getRegionName(), metaKey, |
| HConstants.COLUMN_FAMILY); |
| if (regionInfoRow == null) { |
| throw new TableNotFoundException(Bytes.toString(tableName)); |
| } |
| |
| Cell value = regionInfoRow.get(COL_REGIONINFO); |
| if (value == null || value.getValue().length == 0) { |
| throw new IOException("HRegionInfo was null or empty in " + |
| Bytes.toString(parentTable)); |
| } |
| // convert the row result into the HRegionLocation we need! |
| HRegionInfo regionInfo = (HRegionInfo) Writables.getWritable( |
| value.getValue(), new HRegionInfo()); |
| // possible we got a region of a different table... |
| if (!Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) { |
| throw new TableNotFoundException( |
| "Table '" + Bytes.toString(tableName) + "' was not found."); |
| } |
| if (regionInfo.isOffline()) { |
| throw new RegionOfflineException("region offline: " + |
| regionInfo.getRegionNameAsString()); |
| } |
| |
| String serverAddress = |
| Writables.cellToString(regionInfoRow.get(COL_SERVER)); |
| if (serverAddress.equals("")) { |
| throw new NoServerForRegionException("No server address listed " + |
| "in " + Bytes.toString(parentTable) + " for region " + |
| regionInfo.getRegionNameAsString()); |
| } |
| |
| // instantiate the location |
| location = new HRegionLocation(regionInfo, |
| new HServerAddress(serverAddress)); |
| cacheLocation(tableName, location); |
| return location; |
| } catch (TableNotFoundException e) { |
| // if we got this error, probably means the table just plain doesn't |
| // exist. rethrow the error immediately. this should always be coming |
| // from the HTable constructor. |
| throw e; |
| } catch (IOException e) { |
| if (e instanceof RemoteException) { |
| e = RemoteExceptionHandler.decodeRemoteException( |
| (RemoteException) e); |
| } |
| if (tries < numRetries - 1) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("locateRegionInMeta attempt " + tries + " of " + |
| this.numRetries + " failed; retrying after sleep of " + |
| getPauseTime(tries), e); |
| } |
| relocateRegion(parentTable, metaKey); |
| } else { |
| throw e; |
| } |
| } |
| |
| try{ |
| Thread.sleep(getPauseTime(tries)); |
| } catch (InterruptedException e){ |
| // continue |
| } |
| } |
| } |
| |
| /* |
| * Search the cache for a location that fits our table and row key. |
| * Return null if no suitable region is located. TODO: synchronization note |
| * |
| * <p>TODO: This method during writing consumes 15% of CPU doing lookup |
| * into the Soft Reference SortedMap. Improve. |
| * |
| * @param tableName |
| * @param row |
| * @return Null or region location found in cache. |
| */ |
| private HRegionLocation getCachedLocation(final byte [] tableName, |
| final byte [] row) { |
| SoftValueSortedMap<byte [], HRegionLocation> tableLocations = |
| getTableLocations(tableName); |
| |
| // start to examine the cache. we can only do cache actions |
| // if there's something in the cache for this table. |
| if (tableLocations.isEmpty()) { |
| return null; |
| } |
| |
| HRegionLocation rl = tableLocations.get(row); |
| if (rl != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Cache hit for row <" + |
| Bytes.toString(row) + |
| "> in tableName " + Bytes.toString(tableName) + |
| ": location server " + rl.getServerAddress() + |
| ", location region name " + |
| rl.getRegionInfo().getRegionNameAsString()); |
| } |
| return rl; |
| } |
| |
| // Cut the cache so that we only get the part that could contain |
| // regions that match our key |
| SoftValueSortedMap<byte[], HRegionLocation> matchingRegions = |
| tableLocations.headMap(row); |
| |
| // if that portion of the map is empty, then we're done. otherwise, |
| // we need to examine the cached location to verify that it is |
| // a match by end key as well. |
| if (!matchingRegions.isEmpty()) { |
| HRegionLocation possibleRegion = |
| matchingRegions.get(matchingRegions.lastKey()); |
| |
| // there is a possibility that the reference was garbage collected |
| // in the instant since we checked isEmpty(). |
| if (possibleRegion != null) { |
| byte[] endKey = possibleRegion.getRegionInfo().getEndKey(); |
| |
| // make sure that the end key is greater than the row we're looking |
| // for, otherwise the row actually belongs in the next region, not |
| // this one. the exception case is when the endkey is EMPTY_START_ROW, |
| // signifying that the region we're checking is actually the last |
| // region in the table. |
| if (HStoreKey.equalsTwoRowKeys(possibleRegion.getRegionInfo(), |
| endKey, HConstants.EMPTY_END_ROW) || |
| HStoreKey.compareTwoRowKeys(possibleRegion.getRegionInfo(), |
| endKey, row) > 0) { |
| return possibleRegion; |
| } |
| } |
| } |
| |
| // Passed all the way through, so we got nothin - complete cache miss |
| return null; |
| } |
| |
| /* |
| * Delete a cached location, if it satisfies the table name and row |
| * requirements. |
| */ |
| private void deleteCachedLocation(final byte [] tableName, |
| final byte [] row) { |
| SoftValueSortedMap<byte [], HRegionLocation> tableLocations = |
| getTableLocations(tableName); |
| |
| // start to examine the cache. we can only do cache actions |
| // if there's something in the cache for this table. |
| if (!tableLocations.isEmpty()) { |
| // cut the cache so that we only get the part that could contain |
| // regions that match our key |
| SoftValueSortedMap<byte [], HRegionLocation> matchingRegions = |
| tableLocations.headMap(row); |
| |
| // if that portion of the map is empty, then we're done. otherwise, |
| // we need to examine the cached location to verify that it is |
| // a match by end key as well. |
| if (!matchingRegions.isEmpty()) { |
| HRegionLocation possibleRegion = |
| matchingRegions.get(matchingRegions.lastKey()); |
| byte [] endKey = possibleRegion.getRegionInfo().getEndKey(); |
| |
| // by nature of the map, we know that the start key has to be < |
| // otherwise it wouldn't be in the headMap. |
| if (HStoreKey.compareTwoRowKeys(possibleRegion.getRegionInfo(), |
| endKey, row) <= 0) { |
| // delete any matching entry |
| HRegionLocation rl = |
| tableLocations.remove(matchingRegions.lastKey()); |
| if (rl != null && LOG.isDebugEnabled()) { |
| LOG.debug("Removed " + rl.getRegionInfo().getRegionNameAsString() + |
| " for tableName=" + Bytes.toString(tableName) + " from cache " + |
| "because of " + Bytes.toString(row)); |
| } |
| } |
| } |
| } |
| } |
| |
| /* |
| * @param tableName |
| * @return Map of cached locations for passed <code>tableName</code> |
| */ |
| private SoftValueSortedMap<byte [], HRegionLocation> getTableLocations( |
| final byte [] tableName) { |
| // find the map of cached locations for this table |
| Integer key = Bytes.mapKey(tableName); |
| SoftValueSortedMap<byte [], HRegionLocation> result = null; |
| synchronized (this.cachedRegionLocations) { |
| result = this.cachedRegionLocations.get(key); |
| // if tableLocations for this table isn't built yet, make one |
| if (result == null) { |
| result = new SoftValueSortedMap<byte [], HRegionLocation>( |
| Bytes.BYTES_COMPARATOR); |
| this.cachedRegionLocations.put(key, result); |
| } |
| } |
| return result; |
| } |
| |
| /* |
| * Put a newly discovered HRegionLocation into the cache. |
| */ |
| private void cacheLocation(final byte [] tableName, |
| final HRegionLocation location) { |
| byte [] startKey = location.getRegionInfo().getStartKey(); |
| SoftValueSortedMap<byte [], HRegionLocation> tableLocations = |
| getTableLocations(tableName); |
| tableLocations.put(startKey, location); |
| } |
| |
| public HRegionInterface getHRegionConnection(HServerAddress regionServer) |
| throws IOException { |
| getMaster(); |
| HRegionInterface server; |
| synchronized (this.servers) { |
| // See if we already have a connection |
| server = this.servers.get(regionServer.toString()); |
| if (server == null) { // Get a connection |
| try { |
| server = (HRegionInterface)HBaseRPC.waitForProxy( |
| serverInterfaceClass, HBaseRPCProtocolVersion.versionID, |
| regionServer.getInetSocketAddress(), this.conf, |
| this.maxRPCAttempts); |
| } catch (RemoteException e) { |
| throw RemoteExceptionHandler.decodeRemoteException(e); |
| } |
| this.servers.put(regionServer.toString(), server); |
| } |
| } |
| return server; |
| } |
| |
| /* |
| * Repeatedly try to find the root region by asking the master for where it is |
| * @return HRegionLocation for root region if found |
| * @throws NoServerForRegionException - if the root region can not be |
| * located after retrying |
| * @throws IOException |
| */ |
| private HRegionLocation locateRootRegion() |
| throws IOException { |
| getMaster(); |
| HServerAddress rootRegionAddress = null; |
| for (int tries = 0; tries < numRetries; tries++) { |
| int localTimeouts = 0; |
| // ask the master which server has the root region |
| while (rootRegionAddress == null && localTimeouts < numRetries) { |
| rootRegionAddress = master.findRootRegion(); |
| if (rootRegionAddress == null) { |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sleeping " + getPauseTime(tries) + |
| "ms, waiting for root region."); |
| } |
| Thread.sleep(getPauseTime(tries)); |
| } catch (InterruptedException iex) { |
| // continue |
| } |
| localTimeouts++; |
| } |
| } |
| |
| if (rootRegionAddress == null) { |
| throw new NoServerForRegionException( |
| "Timed out trying to locate root region"); |
| } |
| |
| // get a connection to the region server |
| HRegionInterface server = getHRegionConnection(rootRegionAddress); |
| try { |
| // if this works, then we're good, and we have an acceptable address, |
| // so we can stop doing retries and return the result. |
| server.getRegionInfo(HRegionInfo.ROOT_REGIONINFO.getRegionName()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Found ROOT at " + rootRegionAddress); |
| } |
| break; |
| } catch (IOException e) { |
| if (tries == numRetries - 1) { |
| // Don't bother sleeping. We've run out of retries. |
| if (e instanceof RemoteException) { |
| e = RemoteExceptionHandler.decodeRemoteException( |
| (RemoteException) e); |
| } |
| throw e; |
| } |
| |
| // Sleep and retry finding root region. |
| try { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Root region location changed. Sleeping."); |
| } |
| Thread.sleep(getPauseTime(tries)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Wake. Retry finding root region."); |
| } |
| } catch (InterruptedException iex) { |
| // continue |
| } |
| } |
| |
| rootRegionAddress = null; |
| } |
| |
| // if the address is null by this point, then the retries have failed, |
| // and we're sort of sunk |
| if (rootRegionAddress == null) { |
| throw new NoServerForRegionException( |
| "unable to locate root region server"); |
| } |
| |
| // return the region location |
| return new HRegionLocation( |
| HRegionInfo.ROOT_REGIONINFO, rootRegionAddress); |
| } |
| |
| public <T> T getRegionServerWithRetries(ServerCallable<T> callable) |
| throws IOException, RuntimeException { |
| getMaster(); |
| List<Throwable> exceptions = new ArrayList<Throwable>(); |
| for(int tries = 0; tries < numRetries; tries++) { |
| try { |
| callable.instantiateServer(tries != 0); |
| return callable.call(); |
| } catch (Throwable t) { |
| if (t instanceof UndeclaredThrowableException) { |
| t = t.getCause(); |
| } |
| if (t instanceof RemoteException) { |
| t = RemoteExceptionHandler.decodeRemoteException((RemoteException)t); |
| } |
| if (t instanceof DoNotRetryIOException) { |
| throw (DoNotRetryIOException)t; |
| } |
| exceptions.add(t); |
| if (tries == numRetries - 1) { |
| throw new RetriesExhaustedException(callable.getServerName(), |
| callable.getRegionName(), callable.getRow(), tries, exceptions); |
| } |
| } |
| try { |
| Thread.sleep(getPauseTime(tries)); |
| } catch (InterruptedException e) { |
| // continue |
| } |
| } |
| return null; |
| } |
| |
| public <T> T getRegionServerForWithoutRetries(ServerCallable<T> callable) |
| throws IOException, RuntimeException { |
| getMaster(); |
| try { |
| callable.instantiateServer(false); |
| return callable.call(); |
| } catch (Throwable t) { |
| if (t instanceof UndeclaredThrowableException) { |
| t = t.getCause(); |
| } |
| if (t instanceof RemoteException) { |
| t = RemoteExceptionHandler.decodeRemoteException((RemoteException) t); |
| } |
| if (t instanceof DoNotRetryIOException) { |
| throw (DoNotRetryIOException) t; |
| } |
| } |
| return null; |
| } |
| |
| private HRegionLocation |
| getRegionLocationForRowWithRetries(byte[] tableName, byte[] rowKey, |
| boolean reload) |
| throws IOException { |
| getMaster(); |
| List<Throwable> exceptions = new ArrayList<Throwable>(); |
| HRegionLocation location = null; |
| int tries = 0; |
| while (tries < numRetries) { |
| try { |
| location = getRegionLocation(tableName, rowKey, reload); |
| } catch (Throwable t) { |
| exceptions.add(t); |
| } |
| if (location != null) { |
| break; |
| } |
| reload = true; |
| tries++; |
| try { |
| Thread.sleep(getPauseTime(tries)); |
| } catch (InterruptedException e) { |
| // continue |
| } |
| } |
| if (location == null) { |
| throw new RetriesExhaustedException("Some server", |
| HConstants.EMPTY_BYTE_ARRAY, rowKey, tries, exceptions); |
| } |
| return location; |
| } |
| |
| public void processBatchOfRows(ArrayList<BatchUpdate> list, byte[] tableName) |
| throws IOException { |
| if (list.isEmpty()) { |
| return; |
| } |
| boolean retryOnlyOne = false; |
| int tries = 0; |
| Collections.sort(list); |
| List<BatchUpdate> tempUpdates = new ArrayList<BatchUpdate>(); |
| HRegionLocation location = |
| getRegionLocationForRowWithRetries(tableName, list.get(0).getRow(), |
| false); |
| byte [] currentRegion = location.getRegionInfo().getRegionName(); |
| byte [] region = currentRegion; |
| boolean isLastRow = false; |
| for (int i = 0; i < list.size() && tries < numRetries; i++) { |
| BatchUpdate batchUpdate = list.get(i); |
| tempUpdates.add(batchUpdate); |
| isLastRow = (i + 1) == list.size(); |
| if (!isLastRow) { |
| location = getRegionLocationForRowWithRetries(tableName, |
| list.get(i+1).getRow(), false); |
| region = location.getRegionInfo().getRegionName(); |
| } |
| if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) { |
| final BatchUpdate[] updates = tempUpdates.toArray(new BatchUpdate[0]); |
| int index = getRegionServerWithRetries(new ServerCallable<Integer>( |
| this, tableName, batchUpdate.getRow()) { |
| public Integer call() throws IOException { |
| int i = server.batchUpdates(location.getRegionInfo() |
| .getRegionName(), updates); |
| return i; |
| } |
| }); |
| if (index != -1) { |
| if (tries == numRetries - 1) { |
| throw new RetriesExhaustedException("Some server", |
| currentRegion, batchUpdate.getRow(), |
| tries, new ArrayList<Throwable>()); |
| } |
| long sleepTime = getPauseTime(tries); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Reloading region " + Bytes.toString(currentRegion) + |
| " location because regionserver didn't accept updates; " + |
| "tries=" + tries + |
| " of max=" + this.numRetries + ", waiting=" + sleepTime + "ms"); |
| } |
| try { |
| Thread.sleep(sleepTime); |
| tries++; |
| } catch (InterruptedException e) { |
| // continue |
| } |
| i = i - updates.length + index; |
| retryOnlyOne = true; |
| location = getRegionLocationForRowWithRetries(tableName, |
| list.get(i + 1).getRow(), true); |
| region = location.getRegionInfo().getRegionName(); |
| } |
| else { |
| retryOnlyOne = false; |
| } |
| currentRegion = region; |
| tempUpdates.clear(); |
| } |
| } |
| } |
| |
| void close(boolean stopProxy) { |
| if (master != null) { |
| if (stopProxy) { |
| HBaseRPC.stopProxy(master); |
| } |
| master = null; |
| masterChecked = false; |
| } |
| if (stopProxy) { |
| synchronized (servers) { |
| for (HRegionInterface i: servers.values()) { |
| HBaseRPC.stopProxy(i); |
| } |
| } |
| } |
| } |
| } |
| } |