| /* |
| * Copyright (C) 2010-2012 The Async HBase Authors. All rights reserved. |
| * This file is part of Async HBase. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions are met: |
| * - Redistributions of source code must retain the above copyright notice, |
| * this list of conditions and the following disclaimer. |
| * - Redistributions in binary form must reproduce the above copyright notice, |
| * this list of conditions and the following disclaimer in the documentation |
| * and/or other materials provided with the distribution. |
| * - Neither the name of the StumbleUpon nor the names of its contributors |
| * may be used to endorse or promote products derived from this software |
| * without specific prior written permission. |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
| * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
| * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
| * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
| * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
| * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
| * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
| * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
| * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
| * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| */ |
| package org.hbase.async; |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Executors; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| |
| import com.google.common.cache.LoadingCache; |
| import com.google.protobuf.InvalidProtocolBufferException; |
| |
| import org.apache.zookeeper.AsyncCallback; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.Stat; |
| |
| import org.jboss.netty.channel.ChannelEvent; |
| import org.jboss.netty.channel.ChannelPipeline; |
| import org.jboss.netty.channel.ChannelStateEvent; |
| import org.jboss.netty.channel.DefaultChannelPipeline; |
| import org.jboss.netty.channel.socket.ClientSocketChannelFactory; |
| import org.jboss.netty.channel.socket.SocketChannel; |
| import org.jboss.netty.channel.socket.SocketChannelConfig; |
| import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; |
| import org.jboss.netty.util.HashedWheelTimer; |
| import org.jboss.netty.util.Timeout; |
| import org.jboss.netty.util.Timer; |
| import org.jboss.netty.util.TimerTask; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.stumbleupon.async.Callback; |
| import com.stumbleupon.async.Deferred; |
| |
| import org.hbase.async.generated.ZooKeeperPB; |
| |
| /** |
| * A fully asynchronous, thread-safe, modern HBase client. |
| * <p> |
| * Unlike the traditional HBase client ({@code HTable}), this client should be |
| * instantiated only once. You can use it with any number of tables at the same |
| * time. The only case where you should have multiple instances is when you want |
| * to use multiple different clusters at the same time. |
| * <p> |
| * If you play by the rules, this client is (in theory {@code :D}) completely |
| * thread-safe. Read the documentation carefully to know what the requirements |
| * are for this guarantee to apply. |
| * <p> |
| * This client is fully non-blocking, any blocking operation will return a |
| * {@link Deferred} instance to which you can attach a {@link Callback} chain |
| * that will execute when the asynchronous operation completes. |
| * |
| * <h1>Note regarding {@code HBaseRpc} instances passed to this class</h1> |
| * Every {@link HBaseRpc} passed to a method of this class should not be changed |
| * or re-used until the {@code Deferred} returned by that method calls you back. |
| * <strong>Changing or re-using any {@link HBaseRpc} for an RPC in flight will |
| * lead to <em>unpredictable</em> results and voids your warranty</strong>. |
| * |
| * <a name="#durability"></a> |
| * <h1>Data Durability</h1> |
| * Some methods or RPC types take a {@code durable} argument. When an edit |
| * requests to be durable, the success of the RPC guarantees that the edit is |
| * safely and durably stored by HBase and won't be lost. In case of server |
| * failures, the edit won't be lost although it may become momentarily |
| * unavailable. Setting the {@code durable} argument to {@code false} makes the |
| * operation complete faster (and puts a lot less strain on HBase), but removes |
| * this durability guarantee. In case of a server failure, the edit may (or may |
| * not) be lost forever. When in doubt, leave it to {@code true} (or use the |
| * corresponding method that doesn't accept a {@code durable} argument as it |
| * will default to {@code true}). Setting it to {@code false} is useful in cases |
| * where data-loss is acceptable, e.g. during batch imports (where you can |
| * re-run the whole import in case of a failure), or when you intend to do |
| * statistical analysis on the data (in which case some missing data won't |
| * affect the results as long as the data loss caused by machine failures |
| * preserves the distribution of your data, which depends on how you're building |
| * your row keys and how you're using HBase, so be careful). |
| * <p> |
| * Bear in mind that this durability guarantee holds only once the RPC has |
| * completed successfully. Any edit temporarily buffered on the client side or |
| * in-flight will be lost if the client itself crashes. You can control how much |
| * buffering is done by the client by using {@link #setFlushInterval} and you |
| * can force-flush the buffered edits by calling {@link #flush}. When you're |
| * done using HBase, you <strong>must not</strong> just give up your reference |
| * to your {@code HBaseClient}, you must shut it down gracefully by calling |
| * {@link #shutdown}. If you fail to do this, then all edits still buffered by |
| * the client will be lost. |
| * <p> |
| * <b>NOTE</b>: This entire section assumes that you use a distributed file |
| * system that provides HBase with the required durability semantics. If you use |
| * HDFS, make sure you have a version of HDFS that provides HBase the necessary |
| * API and semantics to durability store its data. |
| * |
| * <h1>{@code throws} clauses</h1> |
| * None of the asynchronous methods in this API are expected to throw an |
| * exception. But the {@link Deferred} object they return to you can carry an |
| * exception that you should handle (using "errbacks", see the javadoc of |
| * {@link Deferred}). In order to be able to do proper asynchronous error |
| * handling, you need to know what types of exceptions you're expected to face |
| * in your errbacks. In order to document that, the methods of this API use |
| * javadoc's {@code @throws} to spell out the exception types you should handle |
| * in your errback. Asynchronous exceptions will be indicated as such in the |
| * javadoc with "(deferred)". |
| * <p> |
| * For instance, if a method {@code foo} pretends to throw an |
| * {@link UnknownScannerException} and returns a {@code Deferred<Whatever>}, |
| * then you should use the method like so: |
| * |
| * <pre> |
| * HBaseClient client = ...; |
| * {@link Deferred}{@code <Whatever>} d = client.foo(); |
| * d.addCallbacks(new {@link Callback}{@code <Whatever, SomethingElse>}() { |
| * SomethingElse call(Whatever arg) { |
| * LOG.info("Yay, RPC completed successfully!"); |
| * return new SomethingElse(arg.getWhateverResult()); |
| * } |
| * String toString() { |
| * return "handle foo response"; |
| * } |
| * }, |
| * new {@link Callback}{@code <Exception, Object>}() { |
| * Object call(Exception arg) { |
| * if (arg instanceof {@link UnknownScannerException}) { |
| * LOG.error("Oops, we used the wrong scanner?", arg); |
| * return otherAsyncOperation(); // returns a {@code Deferred<Blah>} |
| * } |
| * LOG.error("Sigh, the RPC failed and we don't know what to do", arg); |
| * return arg; // Pass on the error to the next errback (if any); |
| * } |
| * String toString() { |
| * return "foo errback"; |
| * } |
| * }); |
| * </pre> |
| * |
| * This code calls {@code foo}, and upon successful completion transforms the |
| * result from a {@code Whatever} to a {@code SomethingElse} (which will then be |
| * given to the next callback in the chain, if any). When there's a failure, the |
| * errback is called instead and it attempts to handle a particular type of |
| * exception by retrying the operation differently. |
| */ |
| public final class HBaseClient { |
| /* |
| * TODO(tsuna): Address the following. - Properly handle disconnects. - |
| * Attempt to reconnect a couple of times, see if it was a transient network |
| * blip. - If the -ROOT- region is unavailable when we start, we should put a |
| * watch in ZK instead of polling it every second. - Handling RPC timeouts. - |
| * Stats: - QPS per RPC type. - Latency histogram per RPC type (requires |
| * open-sourcing the SU Java stats classes that I wrote in a separate |
| * package). - Cache hit rate in the local META cache. - RPC errors and |
| * retries. - Typical batch size when flushing edits (is that useful?). - |
| * Write unit tests and benchmarks! |
| */ |
| |
| private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class); |
| |
| /** |
| * An empty byte array you can use. This can be useful for instance with |
| * {@link Scanner#setStartKey} and {@link Scanner#setStopKey}. |
| */ |
| public static final byte[] EMPTY_ARRAY = new byte[0]; |
| |
| /** A byte array containing a single zero byte. */ |
| private static final byte[] ZERO_ARRAY = new byte[] { 0 }; |
| |
| private static final byte[] ROOT = new byte[] { '-', 'R', 'O', 'O', 'T', '-' }; |
| private static final byte[] ROOT_REGION = new byte[] { '-', 'R', 'O', 'O', |
| 'T', '-', ',', ',', '0' }; |
| private static final byte[] META = new byte[] { '.', 'M', 'E', 'T', 'A', '.' }; |
| static final byte[] INFO = new byte[] { 'i', 'n', 'f', 'o' }; |
| private static final byte[] REGIONINFO = new byte[] { 'r', 'e', 'g', 'i', |
| 'o', 'n', 'i', 'n', 'f', 'o' }; |
| private static final byte[] SERVER = new byte[] { 's', 'e', 'r', 'v', 'e', |
| 'r' }; |
| /** HBase 0.95 and up: .META. is now hbase:meta */ |
| private static final byte[] HBASE96_META = new byte[] { 'h', 'b', 'a', 's', |
| 'e', ':', 'm', 'e', 't', 'a' }; |
| /** New for HBase 0.95 and up: the name of META is fixed. */ |
| private static final byte[] META_REGION_NAME = new byte[] { 'h', 'b', 'a', |
| 's', 'e', ':', 'm', 'e', 't', 'a', ',', ',', '1' }; |
| /** New for HBase 0.95 and up: the region info for META is fixed. */ |
| private static final RegionInfo META_REGION = new RegionInfo(HBASE96_META, |
| META_REGION_NAME, EMPTY_ARRAY); |
| |
| /** |
| * In HBase 0.95 and up, this magic number is found in a couple places. It's |
| * used in the znode that points to the .META. region, to indicate that the |
| * contents of the znode is a protocol buffer. It's also used in the value of |
| * the KeyValue found in the .META. table that contain a {@link RegionInfo}, |
| * to indicate that the value contents is a protocol buffer. |
| */ |
| static final int PBUF_MAGIC = 1346524486; // 4 bytes: "PBUF" |
| |
| /** |
| * Timer we use to handle all our timeouts. TODO(tsuna): Get it through the |
| * ctor to share it with others. TODO(tsuna): Make the tick duration |
| * configurable? |
| */ |
| private final HashedWheelTimer timer = new HashedWheelTimer(20, MILLISECONDS); |
| |
| /** Up to how many milliseconds can we buffer an edit on the client side. */ |
| private volatile short flush_interval = 1000; // ms |
| |
| /** |
| * How many different counters do we want to keep in memory for buffering. |
| * Each entry requires storing the table name, row key, family name and column |
| * qualifier, plus 4 small objects. |
| * |
| * Assuming an average table name of 10 bytes, average key of 20 bytes, |
| * average family name of 10 bytes and average qualifier of 8 bytes, this |
| * would require 65535 * (10 + 20 + 10 + 8 + 4 * 32) / 1024 / 1024 = 11MB of |
| * RAM, which isn't too excessive for a default value. Of course this might |
| * bite people with large keys or qualifiers, but then it's normal to expect |
| * they'd tune this value to cater to their unusual requirements. |
| */ |
| private volatile int increment_buffer_size = 65535; |
| |
| /** |
| * Factory through which we will create all its channels / sockets. |
| */ |
| private final ClientSocketChannelFactory channel_factory; |
| |
| /** Watcher to keep track of the -ROOT- region in ZooKeeper. */ |
| private final ZKClient zkclient; |
| |
| /** |
| * The client currently connected to the -ROOT- region. If this is |
| * {@code null} then we currently don't know where the -ROOT- region is and |
| * we're waiting for a notification from ZooKeeper to tell us where it is. |
| * Note that with HBase 0.95, {@link #has_root} would be false, and this would |
| * instead point to the .META. region. |
| */ |
| private volatile RegionClient rootregion; |
| |
| /** |
| * Whether or not there is a -ROOT- region. When connecting to HBase 0.95 and |
| * up, this would be set to false, so we would go straight to .META. instead. |
| */ |
| volatile boolean has_root = true; |
| |
| /** |
| * Maps {@code (table, start_key)} pairs to the {@link RegionInfo} that serves |
| * this key range for this table. |
| * <p> |
| * The keys in this map are region names. |
| * |
| * @see #createRegionSearchKey Because it's a sorted map, we can efficiently |
| * find a region given an arbitrary key. |
| * @see #getRegion <p> |
| * This map and the next 2 maps contain the same data, but indexed |
| * differently. There is no consistency guarantee across the maps. They |
| * are not updated all at the same time atomically. This map is always |
| * the first to be updated, because that's the map from which all the |
| * lookups are done in the fast-path of the requests that need to locate |
| * a region. The second map to be updated is {@link region2client}, |
| * because it comes second in the fast-path of every requests that need |
| * to locate a region. The third map is only used to handle RegionServer |
| * disconnections gracefully. |
| * <p> |
| * Note: before using the {@link RegionInfo} you pull out of this map, |
| * you <b>must</b> ensure that {@link RegionInfo#table} doesn't return |
| * {@link #EMPTY_ARRAY}. If it does, it means you got a special entry |
| * used to indicate that this region is known to be unavailable right now |
| * due to an NSRE. You must not use this {@link RegionInfo} as if it was |
| * a normal entry. |
| * @see #handleNSRE |
| */ |
| private final ConcurrentSkipListMap<byte[], RegionInfo> regions_cache = new ConcurrentSkipListMap<byte[], RegionInfo>( |
| RegionInfo.REGION_NAME_CMP); |
| |
| /** |
| * Maps a {@link RegionInfo} to the client currently connected to the |
| * RegionServer that serves this region. |
| * <p> |
| * The opposite mapping is stored in {@link #client2regions}. There's no |
| * consistency guarantee with that other map. See the javadoc for |
| * {@link #regions_cache} regarding consistency. |
| */ |
| private final ConcurrentHashMap<RegionInfo, RegionClient> region2client = new ConcurrentHashMap<RegionInfo, RegionClient>(); |
| |
| /** |
| * Maps a client connected to a RegionServer to the list of regions we know |
| * it's serving so far. |
| * <p> |
| * The opposite mapping is stored in {@link #region2client}. There's no |
| * consistency guarantee with that other map. See the javadoc for |
| * {@link #regions_cache} regarding consistency. |
| * <p> |
| * Each list in the map is protected by its own monitor lock. |
| */ |
| private final ConcurrentHashMap<RegionClient, ArrayList<RegionInfo>> client2regions = new ConcurrentHashMap<RegionClient, ArrayList<RegionInfo>>(); |
| |
| /** |
| * Cache that maps a RegionServer address ("ip:port") to the client connected |
| * to it. |
| * <p> |
| * Access to this map must be synchronized by locking its monitor. Lock |
| * ordering: when locking both this map and a RegionClient, the RegionClient |
| * must always be locked first to avoid deadlocks. Logging the contents of |
| * this map (or calling toString) requires copying it first. |
| * <p> |
| * This isn't a {@link ConcurrentHashMap} because we don't use it frequently |
| * (just when connecting to / disconnecting from RegionServers) and when we |
| * add something to it, we want to do an atomic get-and-put, but |
| * {@code putIfAbsent} isn't a good fit for us since it requires to create an |
| * object that may be "wasted" in case another thread wins the insertion race, |
| * and we don't want to create unnecessary connections. |
| * <p> |
| * Upon disconnection, clients are automatically removed from this map. We |
| * don't use a {@code ChannelGroup} because a {@code ChannelGroup} does the |
| * clean-up on the {@code channelClosed} event, which is actually the 3rd and |
| * last event to be fired when a channel gets disconnected. The first one to |
| * get fired is, {@code channelDisconnected}. This matters to us because we |
| * want to purge disconnected clients from the cache as quickly as possible |
| * after the disconnection, to avoid handing out clients that are going to |
| * cause unnecessary errors. |
| * |
| * @see RegionClientPipeline#handleDisconnect |
| */ |
| private final HashMap<String, RegionClient> ip2client = new HashMap<String, RegionClient>(); |
| |
| /** |
| * Map of region name to list of pending RPCs for this region. |
| * <p> |
| * The array-list isn't expected to be empty, except during rare race |
| * conditions. When the list is non-empty, the first element in the list |
| * should be a special "probe" RPC we build to detect when the region NSRE'd |
| * is back online. |
| * <p> |
| * For more details on how this map is used, please refer to the documentation |
| * of {@link #handleNSRE}. |
| * <p> |
| * Each list in the map is protected by its own monitor lock. |
| */ |
| private final ConcurrentSkipListMap<byte[], ArrayList<HBaseRpc>> got_nsre = new ConcurrentSkipListMap<byte[], ArrayList<HBaseRpc>>( |
| RegionInfo.REGION_NAME_CMP); |
| |
| /** |
| * Buffer for atomic increment coalescing. This buffer starts out null, and |
| * remains so until the first time we need to buffer an increment. Once lazily |
| * initialized, this buffer will never become null again. |
| * <p> |
| * We do this so that we can lazily schedule the flush timer only if we ever |
| * have buffered increments. Applications without buffered increments don't |
| * need to pay any memory for the buffer or any CPU time for a useless timer. |
| * |
| * @see #setupIncrementCoalescing |
| */ |
| private volatile LoadingCache<BufferedIncrement, BufferedIncrement.Amount> increment_buffer; |
| |
| // ------------------------ // |
| // Client usage statistics. // |
| // ------------------------ // |
| |
| /** Number of connections created by {@link #newClient}. */ |
| private final Counter num_connections_created = new Counter(); |
| |
| /** How many {@code -ROOT-} lookups were made. */ |
| private final Counter root_lookups = new Counter(); |
| |
| /** How many {@code .META.} lookups were made (with a permit). */ |
| private final Counter meta_lookups_with_permit = new Counter(); |
| |
| /** How many {@code .META.} lookups were made (without a permit). */ |
| private final Counter meta_lookups_wo_permit = new Counter(); |
| |
| /** Number of calls to {@link #flush}. */ |
| private final Counter num_flushes = new Counter(); |
| |
| /** Number of NSREs handled by {@link #handleNSRE}. */ |
| private final Counter num_nsres = new Counter(); |
| |
| /** Number of RPCs delayed by {@link #handleNSRE}. */ |
| private final Counter num_nsre_rpcs = new Counter(); |
| |
| /** Number of {@link MultiAction} sent to the network. */ |
| final Counter num_multi_rpcs = new Counter(); |
| |
| /** Number of calls to {@link #get}. */ |
| private final Counter num_gets = new Counter(); |
| |
| /** Number of calls to {@link #openScanner}. */ |
| private final Counter num_scanners_opened = new Counter(); |
| |
| /** Number of calls to {@link #scanNextRows}. */ |
| private final Counter num_scans = new Counter(); |
| |
| /** Number calls to {@link #put}. */ |
| private final Counter num_puts = new Counter(); |
| |
| /** Number calls to {@link #lockRow}. */ |
| private final Counter num_row_locks = new Counter(); |
| |
| /** Number calls to {@link #delete}. */ |
| private final Counter num_deletes = new Counter(); |
| |
| /** Number of {@link AtomicIncrementRequest} sent. */ |
| private final Counter num_atomic_increments = new Counter(); |
| |
| /** |
| * Constructor. |
| * |
| * @param quorum_spec The specification of the quorum, e.g. |
| * {@code "host1,host2,host3"}. |
| */ |
| public HBaseClient(final String quorum_spec) { |
| this(quorum_spec, "/hbase"); |
| } |
| |
| /** |
| * Constructor. |
| * |
| * @param quorum_spec The specification of the quorum, e.g. |
| * {@code "host1,host2,host3"}. |
| * @param base_path The base path under which is the znode for the -ROOT- |
| * region. |
| */ |
| public HBaseClient(final String quorum_spec, final String base_path) { |
| this(quorum_spec, base_path, defaultChannelFactory()); |
| } |
| |
| /** Creates a default channel factory in case we haven't been given one. */ |
| private static NioClientSocketChannelFactory defaultChannelFactory() { |
| final Executor executor = Executors.newCachedThreadPool(); |
| return new NioClientSocketChannelFactory(executor, executor); |
| } |
| |
| /** |
| * Constructor for advanced users with special needs. |
| * <p> |
| * <strong>NOTE:</strong> Only advanced users who really know what they're |
| * doing should use this constructor. Passing an inappropriate thread pool, or |
| * blocking its threads will prevent this {@code HBaseClient} from working |
| * properly or lead to poor performance. |
| * |
| * @param quorum_spec The specification of the quorum, e.g. |
| * {@code "host1,host2,host3"}. |
| * @param base_path The base path under which is the znode for the -ROOT- |
| * region. |
| * @param executor The executor from which to obtain threads for NIO |
| * operations. It is <strong>strongly</strong> encouraged to use a |
| * {@link Executors#newCachedThreadPool} or something equivalent |
| * unless you're sure to understand how Netty creates and uses |
| * threads. Using a fixed-size thread pool will not work the way you |
| * expect. |
| * <p> |
| * Note that calling {@link #shutdown} on this client will <b>NOT</b> |
| * shut down the executor. |
| * @see NioClientSocketChannelFactory |
| * @since 1.2 |
| */ |
| public HBaseClient(final String quorum_spec, final String base_path, |
| final Executor executor) { |
| this(quorum_spec, base_path, new CustomChannelFactory(executor)); |
| } |
| |
| /** A custom channel factory that doesn't shutdown its executor. */ |
| private static final class CustomChannelFactory extends |
| NioClientSocketChannelFactory { |
| CustomChannelFactory(final Executor executor) { |
| super(executor, executor); |
| } |
| |
| @Override |
| public void releaseExternalResources() { |
| // Do nothing, we don't want to shut down the executor. |
| } |
| } |
| |
| /** |
| * Constructor for advanced users with special needs. |
| * <p> |
| * Most users don't need to use this constructor. |
| * |
| * @param quorum_spec The specification of the quorum, e.g. |
| * {@code "host1,host2,host3"}. |
| * @param base_path The base path under which is the znode for the -ROOT- |
| * region. |
| * @param channel_factory A custom factory to use to create sockets. |
| * <p> |
| * Note that calling {@link #shutdown} on this client will also cause |
| * the shutdown and release of the factory and its underlying thread |
| * pool. |
| * @since 1.2 |
| */ |
| public HBaseClient(final String quorum_spec, final String base_path, |
| final ClientSocketChannelFactory channel_factory) { |
| this.channel_factory = channel_factory; |
| zkclient = new ZKClient(quorum_spec, base_path); |
| } |
| |
| /** |
| * Returns a snapshot of usage statistics for this client. |
| * |
| * @since 1.3 |
| */ |
| public ClientStats stats() { |
| final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> cache = increment_buffer; |
| return new ClientStats(num_connections_created.get(), root_lookups.get(), |
| meta_lookups_with_permit.get(), meta_lookups_wo_permit.get(), |
| num_flushes.get(), num_nsres.get(), num_nsre_rpcs.get(), |
| num_multi_rpcs.get(), num_gets.get(), num_scanners_opened.get(), |
| num_scans.get(), num_puts.get(), num_row_locks.get(), |
| num_deletes.get(), num_atomic_increments.get(), |
| cache != null ? cache.stats() : BufferedIncrement.ZERO_STATS); |
| } |
| |
| /** |
| * Flushes to HBase any buffered client-side write operation. |
| * <p> |
| * |
| * @return A {@link Deferred}, whose callback chain will be invoked when |
| * everything that was buffered at the time of the call has been |
| * flushed. |
| * <p> |
| * Note that this doesn't guarantee that <b>ALL</b> outstanding RPCs |
| * have completed. This doesn't introduce any sort of global sync |
| * point. All it does really is it sends any buffered RPCs to HBase. |
| */ |
| public Deferred<Object> flush() { |
| { |
| // If some RPCs are waiting for -ROOT- to be discovered, we too must wait |
| // because some of those RPCs could be edits that we must wait on. |
| final Deferred<Object> d = zkclient.getDeferredRootIfBeingLookedUp(); |
| if (d != null) { |
| LOG.debug("Flush needs to wait on {} to come back", has_root ? "-ROOT-" |
| : ".META."); |
| final class RetryFlush implements Callback<Object, Object> { |
| public Object call(final Object arg) { |
| LOG.debug("Flush retrying after {} came back", has_root ? "-ROOT-" |
| : ".META."); |
| return flush(); |
| } |
| |
| public String toString() { |
| return "retry flush"; |
| } |
| } |
| return d.addBoth(new RetryFlush()); |
| } |
| } |
| |
| num_flushes.increment(); |
| final boolean need_sync; |
| { |
| final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> buf = increment_buffer; // Single |
| // volatile-read. |
| if (buf != null && !buf.asMap().isEmpty()) { |
| flushBufferedIncrements(buf); |
| need_sync = true; |
| } else { |
| need_sync = false; |
| } |
| } |
| final ArrayList<Deferred<Object>> d = new ArrayList<Deferred<Object>>( |
| client2regions.size() + got_nsre.size() * 8); |
| // Bear in mind that we're traversing a ConcurrentHashMap, so we may get |
| // clients that have been removed from the map since we started iterating. |
| for (final RegionClient client : client2regions.keySet()) { |
| d.add(need_sync ? client.sync() : client.flush()); |
| } |
| for (final ArrayList<HBaseRpc> nsred : got_nsre.values()) { |
| synchronized (nsred) { |
| for (final HBaseRpc rpc : nsred) { |
| if (rpc instanceof HBaseRpc.IsEdit) { |
| d.add(rpc.getDeferred()); |
| } |
| } |
| } |
| } |
| @SuppressWarnings("unchecked") |
| final Deferred<Object> flushed = (Deferred) Deferred.group(d); |
| return flushed; |
| } |
| |
| /** |
| * Sets the maximum time (in milliseconds) for which edits can be buffered. |
| * <p> |
| * This interval will be honored on a "best-effort" basis. Edits can be |
| * buffered for longer than that due to GC pauses, the resolution of the |
| * underlying timer, thread scheduling at the OS level (particularly if the OS |
| * is overloaded with concurrent requests for CPU time), any low-level |
| * buffering in the TCP/IP stack of the OS, etc. |
| * <p> |
| * Setting a longer interval allows the code to batch requests more |
| * efficiently but puts you at risk of greater data loss if the JVM or machine |
| * was to fail. It also entails that some edits will not reach HBase until a |
| * longer period of time, which can be troublesome if you have other |
| * applications that need to read the "latest" changes. |
| * <p> |
| * Setting this interval to 0 disables this feature. |
| * <p> |
| * The change is guaranteed to take effect at most after a full interval has |
| * elapsed, <i>using the previous interval</i> (which is returned). |
| * |
| * @param flush_interval A positive time interval in milliseconds. |
| * @return The previous flush interval. |
| * @throws IllegalArgumentException if {@code flush_interval < 0}. |
| */ |
| public short setFlushInterval(final short flush_interval) { |
| // Note: if we have buffered increments, they'll pick up the new flush |
| // interval next time the current timer fires. |
| if (flush_interval < 0) { |
| throw new IllegalArgumentException("Negative: " + flush_interval); |
| } |
| final short prev = this.flush_interval; |
| this.flush_interval = flush_interval; |
| return prev; |
| } |
| |
| /** |
| * Changes the size of the increment buffer. |
| * <p> |
| * <b>NOTE:</b> because there is no way to resize the existing buffer, this |
| * method will flush the existing buffer and create a new one. This side |
| * effect might be unexpected but is unfortunately required. |
| * <p> |
| * This determines the maximum number of counters this client will keep |
| * in-memory to allow increment coalescing through |
| * {@link #bufferAtomicIncrement}. |
| * <p> |
| * The greater this number, the more memory will be used to buffer increments, |
| * and the more efficient increment coalescing can be if you have a |
| * high-throughput application with a large working set of counters. |
| * <p> |
| * If your application has excessively large keys or qualifiers, you might |
| * consider using a lower number in order to reduce memory usage. |
| * |
| * @param increment_buffer_size The new size of the buffer. |
| * @return The previous size of the buffer. |
| * @throws IllegalArgumentException if {@code increment_buffer_size < 0}. |
| * @since 1.3 |
| */ |
| public int setIncrementBufferSize(final int increment_buffer_size) { |
| if (increment_buffer_size < 0) { |
| throw new IllegalArgumentException("Negative: " + increment_buffer_size); |
| } |
| final int current = this.increment_buffer_size; |
| if (current == increment_buffer_size) { |
| return current; |
| } |
| this.increment_buffer_size = increment_buffer_size; |
| final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> prev = increment_buffer; // Volatile-read. |
| if (prev != null) { // Need to resize. |
| makeIncrementBuffer(); // Volatile-write. |
| flushBufferedIncrements(prev); |
| } |
| return current; |
| } |
| |
| /** |
| * Returns the timer used by this client. |
| * <p> |
| * All timeouts, retries and other things that need to "sleep asynchronously" |
| * use this timer. This method is provided so that you can also schedule your |
| * own timeouts using this timer, if you wish to share this client's timer |
| * instead of creating your own. |
| * <p> |
| * The precision of this timer is implementation-defined but is guaranteed to |
| * be no greater than 20ms. |
| * |
| * @since 1.2 |
| */ |
| public Timer getTimer() { |
| return timer; |
| } |
| |
| /** |
| * Schedules a new timeout. |
| * |
| * @param task The task to execute when the timer times out. |
| * @param timeout_ms The timeout, in milliseconds (strictly positive). |
| */ |
| void newTimeout(final TimerTask task, final long timeout_ms) { |
| try { |
| timer.newTimeout(task, timeout_ms, MILLISECONDS); |
| } catch (IllegalStateException e) { |
| // This can happen if the timer fires just before shutdown() |
| // is called from another thread, and due to how threads get |
| // scheduled we tried to call newTimeout() after timer.stop(). |
| LOG.warn("Failed to schedule timer." |
| + " Ignore this if we're shutting down.", e); |
| } |
| } |
| |
| /** |
| * Returns the maximum time (in milliseconds) for which edits can be buffered. |
| * <p> |
| * The default value is an unspecified and implementation dependent, but is |
| * guaranteed to be non-zero. |
| * <p> |
| * A return value of 0 indicates that edits are sent directly to HBase without |
| * being buffered. |
| * |
| * @see #setFlushInterval |
| */ |
| public short getFlushInterval() { |
| return flush_interval; |
| } |
| |
| /** |
| * Returns the capacity of the increment buffer. |
| * <p> |
| * Note this returns the <em>capacity</em> of the buffer, not the number of |
| * items currently in it. There is currently no API to get the current number |
| * of items in it. |
| * |
| * @since 1.3 |
| */ |
| public int getIncrementBufferSize() { |
| return increment_buffer_size; |
| } |
| |
| /** |
| * Performs a graceful shutdown of this instance. |
| * <p> |
| * <ul> |
| * <li>{@link #flush Flushes} all buffered edits.</li> |
| * <li>Completes all outstanding requests.</li> |
| * <li>Terminates all connections.</li> |
| * <li>Releases all other resources.</li> |
| * </ul> |
| * <strong>Not calling this method before losing the last reference to this |
| * instance may result in data loss and other unwanted side effects</strong> |
| * |
| * @return A {@link Deferred}, whose callback chain will be invoked once all |
| * of the above have been done. If this callback chain doesn't fail, |
| * then the clean shutdown will be successful, and all the data will |
| * be safe on the HBase side (provided that you use <a |
| * href="#durability">durable</a> edits). In case of a failure (the |
| * "errback" is invoked) you may want to retry the shutdown to avoid |
| * losing data, depending on the nature of the failure. TODO(tsuna): |
| * Document possible / common failure scenarios. |
| */ |
| public Deferred<Object> shutdown() { |
| // This is part of step 3. We need to execute this in its own thread |
| // because Netty gets stuck in an infinite loop if you try to shut it |
| // down from within a thread of its own thread pool. They don't want |
| // to fix this so as a workaround we always shut Netty's thread pool |
| // down from another thread. |
| final class ShutdownThread extends Thread { |
| ShutdownThread() { |
| super("HBaseClient@" + HBaseClient.super.hashCode() + " shutdown"); |
| } |
| |
| public void run() { |
| // This terminates the Executor. |
| channel_factory.releaseExternalResources(); |
| } |
| } |
| ; |
| |
| // 3. Release all other resources. |
| final class ReleaseResourcesCB implements Callback<Object, Object> { |
| public Object call(final Object arg) { |
| LOG.debug("Releasing all remaining resources"); |
| timer.stop(); |
| new ShutdownThread().start(); |
| return arg; |
| } |
| |
| public String toString() { |
| return "release resources callback"; |
| } |
| } |
| |
| // 2. Terminate all connections. |
| final class DisconnectCB implements Callback<Object, Object> { |
| public Object call(final Object arg) { |
| return disconnectEverything().addCallback(new ReleaseResourcesCB()); |
| } |
| |
| public String toString() { |
| return "disconnect callback"; |
| } |
| } |
| |
| // If some RPCs are waiting for -ROOT- to be discovered, we too must wait |
| // because some of those RPCs could be edits that we must not lose. |
| final Deferred<Object> d = zkclient.getDeferredRootIfBeingLookedUp(); |
| if (d != null) { |
| LOG.debug("Shutdown needs to wait on {} to come back", |
| has_root ? "-ROOT-" : ".META."); |
| final class RetryShutdown implements Callback<Object, Object> { |
| public Object call(final Object arg) { |
| LOG.debug("Shutdown retrying after {} came back", has_root ? "-ROOT-" |
| : ".META."); |
| return shutdown(); |
| } |
| |
| public String toString() { |
| return "retry shutdown"; |
| } |
| } |
| return d.addBoth(new RetryShutdown()); |
| } |
| |
| // 1. Flush everything. |
| return flush().addCallback(new DisconnectCB()); |
| } |
| |
| /** |
| * Closes every socket, which will also flush all internal region caches. |
| */ |
| private Deferred<Object> disconnectEverything() { |
| HashMap<String, RegionClient> ip2client_copy; |
| |
| synchronized (ip2client) { |
| // Make a local copy so we can shutdown every Region Server client |
| // without hold the lock while we iterate over the data structure. |
| ip2client_copy = new HashMap<String, RegionClient>(ip2client); |
| } |
| |
| final ArrayList<Deferred<Object>> d = new ArrayList<Deferred<Object>>( |
| ip2client_copy.values().size() + 1); |
| // Shut down all client connections, clear cache. |
| for (final RegionClient client : ip2client_copy.values()) { |
| d.add(client.shutdown()); |
| } |
| if (rootregion != null && rootregion.isAlive()) { |
| // It's OK if we already did that in the loop above. |
| d.add(rootregion.shutdown()); |
| } |
| ip2client_copy = null; |
| |
| final int size = d.size(); |
| return Deferred.group(d).addCallback( |
| new Callback<Object, ArrayList<Object>>() { |
| public Object call(final ArrayList<Object> arg) { |
| // Normally, now that we've shutdown() every client, all our caches |
| // should |
| // be empty since each shutdown() generates a DISCONNECTED event, |
| // which |
| // causes RegionClientPipeline to call removeClientFromCache(). |
| HashMap<String, RegionClient> logme = null; |
| synchronized (ip2client) { |
| if (!ip2client.isEmpty()) { |
| logme = new HashMap<String, RegionClient>(ip2client); |
| } |
| } |
| if (logme != null) { |
| // Putting this logging statement inside the synchronized block |
| // can lead to a deadlock, since HashMap.toString() is going to |
| // call RegionClient.toString() on each entry, and this locks the |
| // client briefly. Other parts of the code lock clients first and |
| // the ip2client HashMap second, so this can easily deadlock. |
| LOG.error("Some clients are left in the client cache and haven't" |
| + " been cleaned up: " + logme); |
| logme = null; |
| return disconnectEverything(); // Try again. |
| } |
| zkclient.disconnectZK(); |
| return arg; |
| } |
| |
| public String toString() { |
| return "wait " + size + " RegionClient.shutdown()"; |
| } |
| }); |
| } |
| |
| /** |
| * Ensures that a given table/family pair really exists. |
| * <p> |
| * It's recommended to call this method in the startup code of your |
| * application if you know ahead of time which tables / families you're going |
| * to need, because it'll allow you to "fail fast" if they're missing. |
| * <p> |
| * Both strings are assumed to use the platform's default charset. |
| * |
| * @param table The name of the table you intend to use. |
| * @param family The column family you intend to use in that table. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @throws TableNotFoundException (deferred) if the table doesn't exist. |
| * @throws NoSuchColumnFamilyException (deferred) if the family doesn't exist. |
| */ |
| public Deferred<Object> ensureTableFamilyExists(final String table, |
| final String family) { |
| return ensureTableFamilyExists(table.getBytes(), family.getBytes()); |
| } |
| |
| /** |
| * Ensures that a given table/family pair really exists. |
| * <p> |
| * It's recommended to call this method in the startup code of your |
| * application if you know ahead of time which tables / families you're going |
| * to need, because it'll allow you to "fail fast" if they're missing. |
| * <p> |
| * |
| * @param table The name of the table you intend to use. |
| * @param family The column family you intend to use in that table. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @throws TableNotFoundException (deferred) if the table doesn't exist. |
| * @throws NoSuchColumnFamilyException (deferred) if the family doesn't exist. |
| */ |
| public Deferred<Object> ensureTableFamilyExists(final byte[] table, |
| final byte[] family) { |
| // Just "fault in" the first region of the table. Not the most optimal or |
| // useful thing to do but gets the job done for now. TODO(tsuna): Improve. |
| final HBaseRpc dummy; |
| if (family == EMPTY_ARRAY) { |
| dummy = GetRequest.exists(table, probeKey(ZERO_ARRAY)); |
| } else { |
| dummy = GetRequest.exists(table, probeKey(ZERO_ARRAY), family); |
| } |
| @SuppressWarnings("unchecked") |
| final Deferred<Object> d = (Deferred) sendRpcToRegion(dummy); |
| return d; |
| } |
| |
| /** |
| * Ensures that a given table really exists. |
| * <p> |
| * It's recommended to call this method in the startup code of your |
| * application if you know ahead of time which tables / families you're going |
| * to need, because it'll allow you to "fail fast" if they're missing. |
| * <p> |
| * |
| * @param table The name of the table you intend to use. The string is assumed |
| * to use the platform's default charset. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @throws TableNotFoundException (deferred) if the table doesn't exist. |
| */ |
| public Deferred<Object> ensureTableExists(final String table) { |
| return ensureTableFamilyExists(table.getBytes(), EMPTY_ARRAY); |
| } |
| |
| /** |
| * Ensures that a given table really exists. |
| * <p> |
| * It's recommended to call this method in the startup code of your |
| * application if you know ahead of time which tables / families you're going |
| * to need, because it'll allow you to "fail fast" if they're missing. |
| * <p> |
| * |
| * @param table The name of the table you intend to use. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @throws TableNotFoundException (deferred) if the table doesn't exist. |
| */ |
| public Deferred<Object> ensureTableExists(final byte[] table) { |
| return ensureTableFamilyExists(table, EMPTY_ARRAY); |
| } |
| |
| /** |
| * Retrieves data from HBase. |
| * |
| * @param request The {@code get} request. |
| * @return A deferred list of key-values that matched the get request. |
| */ |
| public Deferred<ArrayList<KeyValue>> get(final GetRequest request) { |
| num_gets.increment(); |
| return sendRpcToRegion(request).addCallbacks(got, Callback.PASSTHROUGH); |
| } |
| |
| /** Singleton callback to handle responses of "get" RPCs. */ |
| private static final Callback<ArrayList<KeyValue>, Object> got = new Callback<ArrayList<KeyValue>, Object>() { |
| public ArrayList<KeyValue> call(final Object response) { |
| if (response instanceof ArrayList) { |
| @SuppressWarnings("unchecked") |
| final ArrayList<KeyValue> row = (ArrayList<KeyValue>) response; |
| return row; |
| } else { |
| throw new InvalidResponseException(ArrayList.class, response); |
| } |
| } |
| |
| public String toString() { |
| return "type get response"; |
| } |
| }; |
| |
| /** |
| * Creates a new {@link Scanner} for a particular table. |
| * |
| * @param table The name of the table you intend to scan. |
| * @return A new scanner for this table. |
| */ |
| public Scanner newScanner(final byte[] table) { |
| return new Scanner(this, table); |
| } |
| |
| /** |
| * Creates a new {@link Scanner} for a particular table. |
| * |
| * @param table The name of the table you intend to scan. The string is |
| * assumed to use the platform's default charset. |
| * @return A new scanner for this table. |
| */ |
| public Scanner newScanner(final String table) { |
| return new Scanner(this, table.getBytes()); |
| } |
| |
| /** |
| * Package-private access point for {@link Scanner}s to open themselves. |
| * |
| * @param scanner The scanner to open. |
| * @return A deferred scanner ID (long) if HBase 0.94 and before, or a |
| * deferred {@link Scanner.Response} if HBase 0.95 and up. |
| */ |
| Deferred<Object> openScanner(final Scanner scanner) { |
| num_scanners_opened.increment(); |
| return sendRpcToRegion(scanner.getOpenRequest()).addCallbacks( |
| scanner_opened, new Callback<Object, Object>() { |
| public Object call(final Object error) { |
| // Don't let the scanner think it's opened on this region. |
| scanner.invalidate(); |
| return error; // Let the error propagate. |
| } |
| |
| public String toString() { |
| return "openScanner errback"; |
| } |
| }); |
| } |
| |
| /** Singleton callback to handle responses of "openScanner" RPCs. */ |
| private static final Callback<Object, Object> scanner_opened = new Callback<Object, Object>() { |
| public Object call(final Object response) { |
| if (response instanceof Scanner.Response) { // HBase 0.95 and up |
| return (Scanner.Response) response; |
| } else if (response instanceof Long) { |
| // HBase 0.94 and before: we expect just a long (the scanner ID). |
| return (Long) response; |
| } else { |
| throw new InvalidResponseException(Long.class, response); |
| } |
| } |
| |
| public String toString() { |
| return "type openScanner response"; |
| } |
| }; |
| |
| /** |
| * Returns the client currently known to hose the given region, or NULL. |
| */ |
| private RegionClient clientFor(final RegionInfo region) { |
| if (region == null) { |
| return null; |
| } else if (region == META_REGION || Bytes.equals(region.table(), ROOT)) { |
| // HBase 0.95+: META_REGION (which is 0.95 specific) is our root. |
| // HBase 0.94 and earlier: if we're looking for -ROOT-, stop here. |
| return rootregion; |
| } |
| return region2client.get(region); |
| } |
| |
| /** |
| * Package-private access point for {@link Scanner}s to scan more rows. |
| * |
| * @param scanner The scanner to use. |
| * @param nrows The maximum number of rows to retrieve. |
| * @return A deferred row. |
| */ |
| Deferred<Object> scanNextRows(final Scanner scanner) { |
| final RegionInfo region = scanner.currentRegion(); |
| final RegionClient client = clientFor(region); |
| if (client == null) { |
| // Oops, we no longer know anything about this client or region. Our |
| // cache was probably invalidated while the client was scanning. This |
| // means that we lost the connection to that RegionServer, so we have to |
| // re-open this scanner if we wanna keep scanning. |
| scanner.invalidate(); // Invalidate the scanner so that ... |
| @SuppressWarnings("unchecked") |
| final Deferred<Object> d = (Deferred) scanner.nextRows(); |
| return d; // ... this will re-open it ______.^ |
| } |
| num_scans.increment(); |
| final HBaseRpc next_request = scanner.getNextRowsRequest(); |
| final Deferred<Object> d = next_request.getDeferred(); |
| client.sendRpc(next_request); |
| return d; |
| } |
| |
| /** |
| * Package-private access point for {@link Scanner}s to close themselves. |
| * |
| * @param scanner The scanner to close. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null}. |
| */ |
| Deferred<Object> closeScanner(final Scanner scanner) { |
| final RegionInfo region = scanner.currentRegion(); |
| final RegionClient client = clientFor(region); |
| if (client == null) { |
| // Oops, we no longer know anything about this client or region. Our |
| // cache was probably invalidated while the client was scanning. So |
| // we can't close this scanner properly. |
| LOG.warn("Cannot close " + scanner + " properly, no connection open for " |
| + Bytes.pretty(region == null ? null : region.name())); |
| return Deferred.fromResult(null); |
| } |
| final HBaseRpc close_request = scanner.getCloseRequest(); |
| final Deferred<Object> d = close_request.getDeferred(); |
| client.sendRpc(close_request); |
| return d; |
| } |
| |
| /** |
| * Atomically and durably increments a value in HBase. |
| * <p> |
| * This is equivalent to |
| * {@link #atomicIncrement(AtomicIncrementRequest, boolean) atomicIncrement} |
| * {@code (request, true)} |
| * |
| * @param request The increment request. |
| * @return The deferred {@code long} value that results from the increment. |
| */ |
| public Deferred<Long> atomicIncrement(final AtomicIncrementRequest request) { |
| num_atomic_increments.increment(); |
| return sendRpcToRegion(request) |
| .addCallbacks(icv_done, Callback.PASSTHROUGH); |
| } |
| |
| /** |
| * Buffers a durable atomic increment for coalescing. |
| * <p> |
| * This increment will be held in memory up to the amount of time allowed by |
| * {@link #getFlushInterval} in order to allow the client to coalesce |
| * increments. |
| * <p> |
| * Increment coalescing can dramatically reduce the number of RPCs and write |
| * load on HBase if you tend to increment multiple times the same working set |
| * of counters. This is very common in user-facing serving systems that use |
| * HBase counters to keep track of user actions. |
| * <p> |
| * If client-side buffering is disabled ({@link #getFlushInterval} returns 0) |
| * then this function has the same effect as calling |
| * {@link #atomicIncrement(AtomicIncrementRequest)} directly. |
| * |
| * @param request The increment request. |
| * @return The deferred {@code long} value that results from the increment. |
| * @since 1.3 |
| * @since 1.4 This method works with negative increment values. |
| */ |
| public Deferred<Long> bufferAtomicIncrement( |
| final AtomicIncrementRequest request) { |
| final long value = request.getAmount(); |
| if (!BufferedIncrement.Amount.checkOverflow(value) // Value too large. |
| || flush_interval == 0) { // Client-side buffer disabled. |
| return atomicIncrement(request); |
| } |
| |
| final BufferedIncrement incr = new BufferedIncrement(request.table(), |
| request.key(), request.family(), request.qualifier()); |
| |
| do { |
| BufferedIncrement.Amount amount; |
| // Semi-evil: the very first time we get here, `increment_buffer' will |
| // still be null (we don't initialize it in our constructor) so we catch |
| // the NPE that ensues to allocate the buffer and kick off a timer to |
| // regularly flush it. |
| try { |
| amount = increment_buffer.getUnchecked(incr); |
| } catch (NullPointerException e) { |
| setupIncrementCoalescing(); |
| amount = increment_buffer.getUnchecked(incr); |
| } |
| if (amount.update(value)) { |
| final Deferred<Long> deferred = new Deferred<Long>(); |
| amount.deferred.chain(deferred); |
| return deferred; |
| } |
| // else: Loop again to retry. |
| increment_buffer.refresh(incr); |
| } while (true); |
| } |
| |
| /** |
| * Called the first time we get a buffered increment. Lazily creates the |
| * increment buffer and sets up a timer to regularly flush buffered |
| * increments. |
| */ |
| private synchronized void setupIncrementCoalescing() { |
| // If multiple threads attempt to setup coalescing at the same time, the |
| // first one to get here will make `increment_buffer' non-null, and thus |
| // subsequent ones will return immediately. This is important to avoid |
| // creating more than one FlushBufferedIncrementsTimer below. |
| if (increment_buffer != null) { |
| return; |
| } |
| makeIncrementBuffer(); // Volatile-write. |
| |
| // Start periodic buffered increment flushes. |
| final class FlushBufferedIncrementsTimer implements TimerTask { |
| public void run(final Timeout timeout) { |
| try { |
| flushBufferedIncrements(increment_buffer); |
| } finally { |
| final short interval = flush_interval; // Volatile-read. |
| // Even if we paused or disabled the client side buffer by calling |
| // setFlushInterval(0), we will continue to schedule this timer |
| // forever instead of pausing it. Pausing it is troublesome because |
| // we don't keep a reference to this timer, so we can't cancel it or |
| // tell if it's running or not. So let's just KISS and assume that |
| // if we need the timer once, we'll need it forever. If it's truly |
| // not needed anymore, we'll just cause a bit of extra work to the |
| // timer thread every 100ms, no big deal. |
| newTimeout(this, interval > 0 ? interval : 100); |
| } |
| } |
| } |
| final short interval = flush_interval; // Volatile-read. |
| // Handle the extremely unlikely yet possible racy case where: |
| // flush_interval was > 0 |
| // A buffered increment came in |
| // It was the first one ever so we landed here |
| // Meanwhile setFlushInterval(0) to disable buffering |
| // In which case we just flush whatever we have in 1ms. |
| timer.newTimeout(new FlushBufferedIncrementsTimer(), |
| interval > 0 ? interval : 1, MILLISECONDS); |
| } |
| |
| /** |
| * Flushes all buffered increments. |
| * |
| * @param increment_buffer The buffer to flush. |
| */ |
| private static void flushBufferedIncrements(// JAVA Y U NO HAVE TYPEDEF? F U! |
| final LoadingCache<BufferedIncrement, BufferedIncrement.Amount> increment_buffer) { |
| // Calling this method to clean up before shutting down works solely |
| // because `invalidateAll()' will *synchronously* remove everything. |
| // The Guava documentation says "Discards all entries in the cache, |
| // possibly asynchronously" but in practice the code in `LocalCache' |
| // works as follows: |
| // |
| // for each segment: |
| // segment.clear |
| // |
| // Where clearing a segment consists in: |
| // |
| // lock the segment |
| // for each active entry: |
| // add entry to removal queue |
| // null out the hash table |
| // unlock the segment |
| // for each entry in removal queue: |
| // call the removal listener on that entry |
| // |
| // So by the time the call to `invalidateAll()' returns, every single |
| // buffered increment will have been dealt with, and it is thus safe |
| // to shutdown the rest of the client to let it complete all outstanding |
| // operations. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Flushing " + increment_buffer.size() + " buffered increments"); |
| } |
| synchronized (increment_buffer) { |
| increment_buffer.invalidateAll(); |
| } |
| } |
| |
| /** |
| * Creates the increment buffer according to current configuration. |
| */ |
| private void makeIncrementBuffer() { |
| final int size = increment_buffer_size; |
| increment_buffer = BufferedIncrement.newCache(this, size); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Created increment buffer of " + size + " entries"); |
| } |
| } |
| |
| /** Singleton callback to handle responses of incrementColumnValue RPCs. */ |
| private static final Callback<Long, Object> icv_done = new Callback<Long, Object>() { |
| public Long call(final Object response) { |
| if (response instanceof Long) { |
| return (Long) response; |
| } else { |
| throw new InvalidResponseException(Long.class, response); |
| } |
| } |
| |
| public String toString() { |
| return "type incrementColumnValue response"; |
| } |
| }; |
| |
| /** |
| * Atomically increments a value in HBase. |
| * |
| * @param request The increment request. |
| * @param durable If {@code true}, the success of this RPC guarantees that |
| * HBase has stored the edit in a <a href="#durability">durable</a> |
| * fashion. When in doubt, use |
| * {@link #atomicIncrement(AtomicIncrementRequest)}. |
| * @return The deferred {@code long} value that results from the increment. |
| */ |
| public Deferred<Long> atomicIncrement(final AtomicIncrementRequest request, |
| final boolean durable) { |
| request.setDurable(durable); |
| return atomicIncrement(request); |
| } |
| |
| /** |
| * Stores data in HBase. |
| * <p> |
| * Note that this provides no guarantee as to the order in which subsequent |
| * {@code put} requests are going to be applied to the backend. If you need |
| * ordering, you must enforce it manually yourself by starting the next |
| * {@code put} once the {@link Deferred} of this one completes successfully. |
| * |
| * @param request The {@code put} request. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. TODO(tsuna): Document failures clients are expected to |
| * handle themselves. |
| */ |
| public Deferred<Object> put(final PutRequest request) { |
| num_puts.increment(); |
| return sendRpcToRegion(request); |
| } |
| |
| /** |
| * Atomic Compare-And-Set (CAS) on a single cell. |
| * <p> |
| * Note that edits sent through this method <b>cannot be batched</b>, and |
| * won't be subject to the {@link #setFlushInterval flush interval}. This |
| * entails that write throughput will be lower with this method as edits have |
| * to be sent out to the wire one by one. |
| * <p> |
| * This request enables you to atomically update the value of an existing cell |
| * in HBase using a CAS operation. It's like a {@link PutRequest} except that |
| * you also pass an expected value. If the last version of the cell identified |
| * by your {@code PutRequest} matches the expected value, HBase will |
| * atomically update it to the new value. |
| * <p> |
| * If the expected value is the empty byte array, HBase will atomically create |
| * the cell provided that it doesn't exist already. This can be used to ensure |
| * that your RPC doesn't overwrite an existing value. Note however that this |
| * trick cannot be used the other way around to delete an expected value |
| * atomically. |
| * |
| * @param edit The new value to write. |
| * @param expected The expected value of the cell to compare against. |
| * <strong>This byte array will NOT be copied.</strong> |
| * @return A deferred boolean, if {@code true} the CAS succeeded, otherwise |
| * the CAS failed because the value in HBase didn't match the expected |
| * value of the CAS request. |
| * @since 1.3 |
| */ |
| public Deferred<Boolean> compareAndSet(final PutRequest edit, |
| final byte[] expected) { |
| return sendRpcToRegion(new CompareAndSetRequest(edit, expected)) |
| .addCallback(CAS_CB); |
| } |
| |
| /** |
| * Atomic Compare-And-Set (CAS) on a single cell. |
| * <p> |
| * Note that edits sent through this method <b>cannot be batched</b>. |
| * |
| * @see #compareAndSet(PutRequest, byte[]) |
| * @param edit The new value to write. |
| * @param expected The expected value of the cell to compare against. This |
| * string is assumed to use the platform's default charset. |
| * @return A deferred boolean, if {@code true} the CAS succeeded, otherwise |
| * the CAS failed because the value in HBase didn't match the expected |
| * value of the CAS request. |
| * @since 1.3 |
| */ |
| public Deferred<Boolean> compareAndSet(final PutRequest edit, |
| final String expected) { |
| return compareAndSet(edit, expected.getBytes()); |
| } |
| |
| /** |
| * Atomically insert a new cell in HBase. |
| * <p> |
| * Note that edits sent through this method <b>cannot be batched</b>. |
| * <p> |
| * This is equivalent to calling {@link #compareAndSet(PutRequest, byte[]) |
| * compareAndSet}{@code (edit, |
| * EMPTY_ARRAY)} |
| * |
| * @see #compareAndSet(PutRequest, byte[]) |
| * @param edit The new value to insert. |
| * @return A deferred boolean, {@code true} if the edit got atomically |
| * inserted in HBase, {@code false} if there was already a value in |
| * the given cell. |
| * @since 1.3 |
| */ |
| public Deferred<Boolean> atomicCreate(final PutRequest edit) { |
| return compareAndSet(edit, EMPTY_ARRAY); |
| } |
| |
| /** Callback to type-check responses of {@link CompareAndSetRequest}. */ |
| private static final class CompareAndSetCB implements |
| Callback<Boolean, Object> { |
| |
| public Boolean call(final Object response) { |
| if (response instanceof Boolean) { |
| return (Boolean) response; |
| } else { |
| throw new InvalidResponseException(Boolean.class, response); |
| } |
| } |
| |
| public String toString() { |
| return "type compareAndSet response"; |
| } |
| |
| } |
| |
| /** Singleton callback for responses of {@link CompareAndSetRequest}. */ |
| private static final CompareAndSetCB CAS_CB = new CompareAndSetCB(); |
| |
| /** |
| * Acquires an explicit row lock. |
| * <p> |
| * For a description of what row locks are, see {@link RowLock}. |
| * |
| * @param request The request specify which row to lock. |
| * @return a deferred {@link RowLock}. |
| * @see #unlockRow |
| */ |
| public Deferred<RowLock> lockRow(final RowLockRequest request) { |
| num_row_locks.increment(); |
| return sendRpcToRegion(request).addCallbacks( |
| new Callback<RowLock, Object>() { |
| public RowLock call(final Object response) { |
| if (response instanceof Long) { |
| return new RowLock(request.getRegion().name(), (Long) response); |
| } else { |
| throw new InvalidResponseException(Long.class, response); |
| } |
| } |
| |
| public String toString() { |
| return "type lockRow response"; |
| } |
| }, Callback.PASSTHROUGH); |
| } |
| |
| /** |
| * Releases an explicit row lock. |
| * <p> |
| * For a description of what row locks are, see {@link RowLock}. |
| * |
| * @param lock The lock to release. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). |
| */ |
| public Deferred<Object> unlockRow(final RowLock lock) { |
| final byte[] region_name = lock.region(); |
| final RegionInfo region = regions_cache.get(region_name); |
| if (knownToBeNSREd(region)) { |
| // If this region has been NSRE'd, we can't possibly still hold a lock |
| // on one of its rows, as this would have prevented it from splitting. |
| // So let's just pretend the row has been unlocked. |
| return Deferred.fromResult(null); |
| } |
| final RegionClient client = clientFor(region); |
| if (client == null) { |
| // Oops, we no longer know anything about this client or region. Our |
| // cache was probably invalidated while the client was holding the lock. |
| LOG.warn("Cannot release " + lock + ", no connection open for " |
| + Bytes.pretty(region_name)); |
| return Deferred.fromResult(null); |
| } |
| final HBaseRpc release = new RowLockRequest.ReleaseRequest(lock, region); |
| release.setRegion(region); |
| final Deferred<Object> d = release.getDeferred(); |
| client.sendRpc(release); |
| return d; |
| } |
| |
| /** |
| * Deletes data from HBase. |
| * |
| * @param request The {@code delete} request. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has not special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| */ |
| public Deferred<Object> delete(final DeleteRequest request) { |
| num_deletes.increment(); |
| return sendRpcToRegion(request); |
| } |
| |
| /** |
| * Eagerly prefetches and caches a table's region metadata from HBase. |
| * |
| * @param table The name of the table whose metadata you intend to prefetch. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has no special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @since 1.5 |
| */ |
| public Deferred<Object> prefetchMeta(final String table) { |
| return prefetchMeta(table.getBytes(), EMPTY_ARRAY, EMPTY_ARRAY); |
| } |
| |
| /** |
| * Eagerly prefetches and caches part of a table's region metadata from HBase. |
| * <p> |
| * The part to prefetch is identified by a row key range, given by |
| * {@code start} and {@code stop}. |
| * |
| * @param table The name of the table whose metadata you intend to prefetch. |
| * @param start The start of the row key range to prefetch metadata for. |
| * @param stop The end of the row key range to prefetch metadata for. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has no special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @since 1.5 |
| */ |
| public Deferred<Object> prefetchMeta(final String table, final String start, |
| final String stop) { |
| return prefetchMeta(table.getBytes(), start.getBytes(), stop.getBytes()); |
| } |
| |
| /** |
| * Eagerly prefetches and caches a table's region metadata from HBase. |
| * |
| * @param table The name of the table whose metadata you intend to prefetch. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has no special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @since 1.5 |
| */ |
| public Deferred<Object> prefetchMeta(final byte[] table) { |
| return prefetchMeta(table, EMPTY_ARRAY, EMPTY_ARRAY); |
| } |
| |
| /** |
| * Eagerly prefetches and caches part of a table's region metadata from HBase. |
| * <p> |
| * The part to prefetch is identified by a row key range, given by |
| * {@code start} and {@code stop}. |
| * |
| * @param table The name of the table whose metadata you intend to prefetch. |
| * @param start The start of the row key range to prefetch metadata for. |
| * @param stop The end of the row key range to prefetch metadata for. |
| * @return A deferred object that indicates the completion of the request. The |
| * {@link Object} has no special meaning and can be {@code null} |
| * (think of it as {@code Deferred<Void>}). But you probably want to |
| * attach at least an errback to this {@code Deferred} to handle |
| * failures. |
| * @since 1.5 |
| */ |
| public Deferred<Object> prefetchMeta(final byte[] table, final byte[] start, |
| final byte[] stop) { |
| // We're going to scan .META. for the table between the row keys and filter |
| // out all but the latest entries on the client side. Whatever remains |
| // will be inserted into the region cache. |
| |
| // But we don't want to do this for .META. or -ROOT-. |
| if (Bytes.equals(table, META) || Bytes.equals(table, ROOT)) { |
| return Deferred.fromResult(null); |
| } |
| |
| // Create the scan bounds. |
| final byte[] meta_start = createRegionSearchKey(table, start); |
| // In this case, we want the scan to start immediately at the |
| // first entry, but createRegionSearchKey finds the last entry. |
| meta_start[meta_start.length - 1] = 0; |
| |
| // The stop bound is trickier. If the user wants the whole table, |
| // expressed by passing EMPTY_ARRAY, then we need to append a null |
| // byte to the table name (thus catching all rows in the desired |
| // table, but excluding those from others.) If the user specifies |
| // an explicit stop key, we must leave the table name alone. |
| final byte[] meta_stop; |
| if (stop.length == 0) { |
| meta_stop = createRegionSearchKey(table, stop); // will return "table,,:" |
| meta_stop[table.length] = 0; // now have "table\0,:" |
| meta_stop[meta_stop.length - 1] = ','; // now have "table\0,," |
| } else { |
| meta_stop = createRegionSearchKey(table, stop); |
| } |
| |
| if (rootregion == null) { |
| // If we don't know where the root region is, we don't yet know whether |
| // there is even a -ROOT- region at all (pre HBase 0.95). So we can't |
| // start scanning meta right away, because we don't yet know whether |
| // meta is named ".META." or "hbase:meta". So instead we first check |
| // whether the table exists, which will force us to do a first meta |
| // lookup (and therefore figure out what the name of meta is). |
| class RetryPrefetch implements Callback<Object, Object> { |
| public Object call(final Object unused) { |
| return prefetchMeta(table, start, stop); |
| } |
| |
| public String toString() { |
| return "retry prefetchMeta(" + Bytes.pretty(table) + ", " |
| + Bytes.pretty(start) + ", " + Bytes.pretty(stop) + ")"; |
| } |
| } |
| return ensureTableExists(table).addCallback(new RetryPrefetch()); |
| } |
| |
| final Scanner meta_scanner = newScanner(has_root ? META : HBASE96_META); |
| meta_scanner.setStartKey(meta_start); |
| meta_scanner.setStopKey(meta_stop); |
| |
| class PrefetchMeta implements |
| Callback<Object, ArrayList<ArrayList<KeyValue>>> { |
| public Object call(final ArrayList<ArrayList<KeyValue>> results) { |
| if (results != null && !results.isEmpty()) { |
| for (final ArrayList<KeyValue> row : results) { |
| discoverRegion(row); |
| } |
| return meta_scanner.nextRows().addCallback(this); |
| } |
| return null; |
| } |
| |
| public String toString() { |
| return "prefetchMeta scanner=" + meta_scanner; |
| } |
| } |
| |
| return meta_scanner.nextRows().addCallback(new PrefetchMeta()); |
| } |
| |
| /** |
| * Sends an RPC targeted at a particular region to the right RegionServer. |
| * <p> |
| * This method is package-private so that the low-level {@link RegionClient} |
| * can retry RPCs when handling a {@link NotServingRegionException}. |
| * |
| * @param request The RPC to send. This RPC <b>must</b> specify a single |
| * specific table and row key. |
| * @return The deferred result of the RPC (whatever object or exception was |
| * de-serialized back from the network). |
| */ |
| Deferred<Object> sendRpcToRegion(final HBaseRpc request) { |
| if (cannotRetryRequest(request)) { |
| return tooManyAttempts(request, null); |
| } |
| request.attempt++; |
| final byte[] table = request.table; |
| final byte[] key = request.key; |
| final RegionInfo region = getRegion(table, key); |
| |
| final class RetryRpc implements Callback<Deferred<Object>, Object> { |
| public Deferred<Object> call(final Object arg) { |
| if (arg instanceof NonRecoverableException) { |
| // No point in retrying here, so fail the RPC. |
| HBaseException e = (NonRecoverableException) arg; |
| if (e instanceof HasFailedRpcException |
| && ((HasFailedRpcException) e).getFailedRpc() != request) { |
| // If we get here it's because a dependent RPC (such as a META |
| // lookup) has failed. Therefore the exception we're getting |
| // indicates that the META lookup failed, but we need to return |
| // to our caller here that it's their RPC that failed. Here we |
| // re-create the exception but with the correct RPC in argument. |
| e = e.make(e, request); // e is likely a PleaseThrottleException. |
| } |
| request.callback(e); |
| return Deferred.fromError(e); |
| } |
| return sendRpcToRegion(request); // Retry the RPC. |
| } |
| |
| public String toString() { |
| return "retry RPC"; |
| } |
| } |
| |
| if (region != null) { |
| if (knownToBeNSREd(region)) { |
| final NotServingRegionException nsre = new NotServingRegionException( |
| "Region known to be unavailable", request); |
| final Deferred<Object> d = request.getDeferred(); |
| handleNSRE(request, region.name(), nsre); |
| return d; |
| } |
| final RegionClient client = clientFor(region); |
| if (client != null && client.isAlive()) { |
| request.setRegion(region); |
| final Deferred<Object> d = request.getDeferred(); |
| if (request instanceof HBaseRpc.SupportsRpcTimeout |
| && ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout() != 0) { |
| final class RpcTimer implements TimerTask { |
| public void run(final Timeout timeout) { |
| request.callback(new RpcTimeoutException(request)); |
| } |
| |
| public String toString() { |
| return "RPC timer for " + request; |
| } |
| } |
| ; |
| request.rpctimeout = timer.newTimeout(new RpcTimer(), |
| ((HBaseRpc.SupportsRpcTimeout) request).rpctimeout(), |
| MILLISECONDS); |
| } |
| client.sendRpc(request); |
| return d; |
| } |
| } |
| return locateRegion(table, key).addBothDeferring(new RetryRpc()); |
| } |
| |
| /** |
| * Returns how many lookups in {@code -ROOT-} were performed. |
| * <p> |
| * This number should remain low. It will be 1 after the first access to |
| * HBase, and will increase by 1 each time the {@code .META.} region moves to |
| * another server, which should seldom happen. |
| * <p> |
| * This isn't to be confused with the number of times we looked up where the |
| * {@code -ROOT-} region itself is located. This happens even more rarely and |
| * a message is logged at the INFO whenever it does. |
| * |
| * @since 1.1 |
| * @deprecated This method will be removed in release 2.0. Use {@link #stats} |
| * {@code .}{@link ClientStats#rootLookups rootLookups()} instead. |
| */ |
| @Deprecated |
| public long rootLookupCount() { |
| return root_lookups.get(); |
| } |
| |
| /** |
| * Returns how many lookups in {@code .META.} were performed (uncontended). |
| * <p> |
| * This number indicates how many times we had to lookup in {@code .META.} |
| * where a key was located. This only counts "uncontended" lookups, where the |
| * thread was able to acquire a "permit" to do a {@code .META.} lookup. The |
| * majority of the {@code .META.} lookups should fall in this category. |
| * |
| * @since 1.1 |
| * @deprecated This method will be removed in release 2.0. Use {@link #stats} |
| * {@code .}{@link ClientStats#uncontendedMetaLookups |
| * uncontendedMetaLookups()} instead. |
| */ |
| @Deprecated |
| public long uncontendedMetaLookupCount() { |
| return meta_lookups_with_permit.get(); |
| } |
| |
| /** |
| * Returns how many lookups in {@code .META.} were performed (contended). |
| * <p> |
| * This number indicates how many times we had to lookup in {@code .META.} |
| * where a key was located. This only counts "contended" lookups, where the |
| * thread was unable to acquire a "permit" to do a {@code .META.} lookup, |
| * because there were already too many {@code .META.} lookups in flight. In |
| * this case, the thread was delayed a bit in order to apply a bit of |
| * back-pressure on the caller, to avoid creating {@code .META.} storms. The |
| * minority of the {@code .META.} lookups should fall in this category. |
| * |
| * @since 1.1 |
| * @deprecated This method will be removed in release 2.0. Use {@link #stats} |
| * {@code .}{@link ClientStats#contendedMetaLookups |
| * contendedMetaLookups()} instead. |
| */ |
| @Deprecated |
| public long contendedMetaLookupCount() { |
| return meta_lookups_wo_permit.get(); |
| } |
| |
| /** |
| * Checks whether or not an RPC can be retried once more. |
| * |
| * @param rpc The RPC we're going to attempt to execute. |
| * @return {@code true} if this RPC already had too many attempts, |
| * {@code false} otherwise (in which case it's OK to retry once more). |
| * @throws NonRecoverableException if the request has had too many attempts |
| * already. |
| */ |
| static boolean cannotRetryRequest(final HBaseRpc rpc) { |
| return rpc.attempt > rpc.maxAttempt; // XXX Don't hardcode. |
| } |
| |
| /** |
| * Returns a {@link Deferred} containing an exception when an RPC couldn't |
| * succeed after too many attempts. |
| * |
| * @param request The RPC that was retried too many times. |
| * @param cause What was cause of the last failed attempt, if known. You can |
| * pass {@code null} if the cause is unknown. |
| */ |
| static Deferred<Object> tooManyAttempts(final HBaseRpc request, |
| final HBaseException cause) { |
| // TODO(tsuna): At this point, it's possible that we have to deal with |
| // a broken META table where there's a hole. For the sake of good error |
| // reporting, at this point we should try to getClosestRowBefore + scan |
| // META in order to verify whether there's indeed a hole, and if there's |
| // one, throw a BrokenMetaException explaining where the hole is. |
| final Exception e = new NonRecoverableException("Too many attempts: " |
| + request, cause); |
| request.callback(e); |
| return Deferred.fromError(e); |
| } |
| |
| // --------------------------------------------------- // |
| // Code that find regions (in our cache or using RPCs) // |
| // --------------------------------------------------- // |
| |
| /** |
| * Locates the region in which the given row key for the given table is. |
| * <p> |
| * This does a lookup in the .META. / -ROOT- table(s), no cache is used. If |
| * you want to use a cache, call {@link #getRegion} instead. |
| * |
| * @param table The table to which the row belongs. |
| * @param key The row key for which we want to locate the region. |
| * @return A deferred called back when the lookup completes. The deferred |
| * carries an unspecified result. |
| * @see #discoverRegion |
| */ |
| private Deferred<Object> locateRegion(final byte[] table, final byte[] key) { |
| final boolean is_meta = Bytes.equals(table, META); |
| final boolean is_root = !is_meta && Bytes.equals(table, ROOT); |
| // We don't know in which region this row key is. Let's look it up. |
| // First, see if we already know where to look in .META. |
| // Except, obviously, we don't wanna search in META for META or ROOT. |
| final byte[] meta_key = is_root ? null : createRegionSearchKey(table, key); |
| final byte[] meta_name; |
| final RegionInfo meta_region; |
| if (has_root) { |
| meta_region = is_meta || is_root ? null : getRegion(META, meta_key); |
| meta_name = META; |
| } else { |
| meta_region = META_REGION; |
| meta_name = HBASE96_META; |
| } |
| |
| if (meta_region != null) { // Always true with HBase 0.95 and up. |
| // Lookup in .META. which region server has the region we want. |
| final RegionClient client = (has_root ? region2client.get(meta_region) // Pre |
| // 0.95 |
| : rootregion); // Post 0.95 |
| if (client != null && client.isAlive()) { |
| final boolean has_permit = client.acquireMetaLookupPermit(); |
| if (!has_permit) { |
| // If we failed to acquire a permit, it's worth checking if someone |
| // looked up the region we're interested in. Every once in a while |
| // this will save us a META lookup. |
| if (getRegion(table, key) != null) { |
| return Deferred.fromResult(null); // Looks like no lookup needed. |
| } |
| } |
| final Deferred<Object> d = client.getClosestRowBefore(meta_region, |
| meta_name, meta_key, INFO).addCallback(meta_lookup_done); |
| if (has_permit) { |
| final class ReleaseMetaLookupPermit implements |
| Callback<Object, Object> { |
| public Object call(final Object arg) { |
| client.releaseMetaLookupPermit(); |
| return arg; |
| } |
| |
| public String toString() { |
| return "release .META. lookup permit"; |
| } |
| } |
| ; |
| d.addBoth(new ReleaseMetaLookupPermit()); |
| meta_lookups_with_permit.increment(); |
| } else { |
| meta_lookups_wo_permit.increment(); |
| } |
| // This errback needs to run *after* the callback above. |
| return d.addErrback(newLocateRegionErrback(table, key)); |
| } |
| } |
| |
| // Make a local copy to avoid race conditions where we test the reference |
| // to be non-null but then it becomes null before the next statement. |
| final RegionClient rootregion = this.rootregion; |
| if (rootregion == null || !rootregion.isAlive()) { |
| return zkclient.getDeferredRoot(); |
| } else if (is_root) { // Don't search ROOT in ROOT. |
| return Deferred.fromResult(null); // We already got ROOT (w00t). |
| } |
| // The rest of this function is only executed with HBase 0.94 and before. |
| |
| // Alright so we don't even know where to look in .META. |
| // Let's lookup the right .META. entry in -ROOT-. |
| final byte[] root_key = createRegionSearchKey(META, meta_key); |
| final RegionInfo root_region = new RegionInfo(ROOT, ROOT_REGION, |
| EMPTY_ARRAY); |
| root_lookups.increment(); |
| return rootregion.getClosestRowBefore(root_region, ROOT, root_key, INFO) |
| .addCallback(root_lookup_done) |
| // This errback needs to run *after* the callback above. |
| .addErrback(newLocateRegionErrback(table, key)); |
| } |
| |
| /** Callback executed when a lookup in META completes. */ |
| private final class MetaCB implements Callback<Object, ArrayList<KeyValue>> { |
| public Object call(final ArrayList<KeyValue> arg) { |
| return discoverRegion(arg); |
| } |
| |
| public String toString() { |
| return "locateRegion in META"; |
| } |
| }; |
| |
| private final MetaCB meta_lookup_done = new MetaCB(); |
| |
| /** Callback executed when a lookup in -ROOT- completes. */ |
| private final class RootCB implements Callback<Object, ArrayList<KeyValue>> { |
| public Object call(final ArrayList<KeyValue> arg) { |
| return discoverRegion(arg); |
| } |
| |
| public String toString() { |
| return "locateRegion in ROOT"; |
| } |
| }; |
| |
| private final RootCB root_lookup_done = new RootCB(); |
| |
| /** |
| * Creates a new callback that handles errors during META lookups. |
| * <p> |
| * This errback should be added *after* adding the callback that invokes |
| * {@link #discoverRegion} so it can properly fill in the table name when a |
| * {@link TableNotFoundException} is thrown (because the low-level code |
| * doesn't know about tables, it only knows about regions, but for proper |
| * error reporting users need the name of the table that wasn't found). |
| * |
| * @param table The table to which the row belongs. |
| * @param key The row key for which we want to locate the region. |
| */ |
| private Callback<Object, Exception> newLocateRegionErrback( |
| final byte[] table, final byte[] key) { |
| return new Callback<Object, Exception>() { |
| public Object call(final Exception e) { |
| if (e instanceof TableNotFoundException) { |
| return new TableNotFoundException(table); // Populate the name. |
| } else if (e instanceof RecoverableException) { |
| // Retry to locate the region. TODO(tsuna): exponential backoff? |
| // XXX this can cause an endless retry loop (particularly if the |
| // address of -ROOT- in ZK is stale when we start, this code is |
| // going to retry in an almost-tight loop until the znode is |
| // updated). |
| return locateRegion(table, key); |
| } |
| return e; |
| } |
| |
| public String toString() { |
| return "locateRegion errback"; |
| } |
| }; |
| } |
| |
| /** |
| * Creates the META key to search for in order to locate the given key. |
| * |
| * @param table The table the row belongs to. |
| * @param key The key to search for in META. |
| * @return A row key to search for in the META table, that will help us locate |
| * the region serving the given {@code (table, key)}. |
| */ |
| private static byte[] createRegionSearchKey(final byte[] table, |
| final byte[] key) { |
| // Rows in .META. look like this: |
| // tablename,startkey,timestamp |
| final byte[] meta_key = new byte[table.length + key.length + 3]; |
| System.arraycopy(table, 0, meta_key, 0, table.length); |
| meta_key[table.length] = ','; |
| System.arraycopy(key, 0, meta_key, table.length + 1, key.length); |
| meta_key[meta_key.length - 2] = ','; |
| // ':' is the first byte greater than '9'. We always want to find the |
| // entry with the greatest timestamp, so by looking right before ':' |
| // we'll find it. |
| meta_key[meta_key.length - 1] = ':'; |
| return meta_key; |
| } |
| |
| /** |
| * Searches in the regions cache for the region hosting the given row. |
| * |
| * @param table The table to which the row belongs. |
| * @param key The row key for which we want to find the region. |
| * @return {@code null} if our cache doesn't know which region is currently |
| * serving that key, in which case you'd have to look that information |
| * up using {@link #locateRegion}. Otherwise returns the cached region |
| * information in which we currently believe that the given row ought |
| * to be. |
| */ |
| private RegionInfo getRegion(final byte[] table, final byte[] key) { |
| if (has_root) { |
| if (Bytes.equals(table, ROOT)) { // HBase 0.94 and before. |
| return new RegionInfo(ROOT, ROOT_REGION, EMPTY_ARRAY); |
| } |
| } else if (Bytes.equals(table, HBASE96_META)) { // HBase 0.95 and up. |
| return META_REGION; |
| } |
| |
| byte[] region_name = createRegionSearchKey(table, key); |
| Map.Entry<byte[], RegionInfo> entry = regions_cache.floorEntry(region_name); |
| if (entry == null) { |
| // if (LOG.isDebugEnabled()) { |
| // LOG.debug("getRegion(table=" + Bytes.pretty(table) + ", key=" |
| // + Bytes.pretty(key) + "): cache miss (nothing found)."); |
| // } |
| return null; |
| } |
| |
| if (!isCacheKeyForTable(table, entry.getKey())) { |
| // if (LOG.isDebugEnabled()) { |
| // LOG.debug("getRegion(table=" + Bytes.pretty(table) + ", key=" |
| // + Bytes.pretty(key) + "): cache miss (diff table):" |
| // + " region=" + entry.getValue()); |
| // } |
| return null; |
| } |
| |
| region_name = null; |
| final RegionInfo region = entry.getValue(); |
| entry = null; |
| |
| final byte[] stop_key = region.stopKey(); |
| if (stop_key != EMPTY_ARRAY |
| // If the stop key is an empty byte array, it means this region is the |
| // last region for this table and this key ought to be in that region. |
| && Bytes.memcmp(key, stop_key) >= 0) { |
| // if (LOG.isDebugEnabled()) { |
| // LOG.debug("getRegion(table=" + Bytes.pretty(table) + ", key=" |
| // + Bytes.pretty(key) + "): miss (key beyond stop_key):" |
| // + " region=" + region); |
| // } |
| return null; |
| } |
| |
| // if (LOG.isDebugEnabled()) { |
| // LOG.debug("getRegion(table=" + Bytes.pretty(table) + ", key=" |
| // + Bytes.pretty(key) + "): cache hit, found: " + region); |
| // } |
| return region; |
| } |
| |
| /** |
| * Checks whether or not the given cache key is for the given table. |
| * |
| * @param table The table for which we want the cache key to be. |
| * @param cache_key The cache key to check. |
| * @return {@code true} if the given cache key is for the given table, |
| * {@code false} otherwise. |
| */ |
| private static boolean isCacheKeyForTable(final byte[] table, |
| final byte[] cache_key) { |
| // Check we found an entry that's really for the requested table. |
| for (int i = 0; i < table.length; i++) { |
| if (table[i] != cache_key[i]) { // This table isn't in the map, we found |
| return false; // a key which is for another table. |
| } |
| } |
| |
| // Make sure we didn't find another key that's for another table |
| // whose name is a prefix of the table name we were given. |
| return cache_key[table.length] == ','; |
| } |
| |
| /** |
| * Adds a new region to our regions cache. |
| * |
| * @param meta_row The (parsed) result of the |
| * {@link RegionClient#getClosestRowBefore} request sent to the |
| * .META. (or -ROOT-) table. |
| * @return The client serving the region we discovered, or {@code null} if |
| * this region isn't being served right now (and we marked it as |
| * NSRE'd). |
| */ |
| private RegionClient discoverRegion(final ArrayList<KeyValue> meta_row) { |
| if (meta_row.isEmpty()) { |
| throw new TableNotFoundException(); |
| } |
| String host = null; |
| int port = -42; |
| RegionInfo region = null; |
| byte[] start_key = null; |
| for (final KeyValue kv : meta_row) { |
| final byte[] qualifier = kv.qualifier(); |
| if (Arrays.equals(REGIONINFO, qualifier)) { |
| final byte[][] tmp = new byte[1][]; // Yes, this is ugly. |
| region = RegionInfo.fromKeyValue(kv, tmp); |
| if (knownToBeNSREd(region)) { |
| invalidateRegionCache(region.name(), true, "has marked it as split."); |
| return null; |
| } |
| start_key = tmp[0]; |
| } else if (Arrays.equals(SERVER, qualifier) && kv.value() != EMPTY_ARRAY) { // Empty |
| // during |
| // NSRE. |
| final byte[] hostport = kv.value(); |
| int colon = hostport.length - 1; |
| for (/**/; colon > 0 /* Can't be at the beginning */; colon--) { |
| if (hostport[colon] == ':') { |
| break; |
| } |
| } |
| if (colon == 0) { |
| throw BrokenMetaException.badKV( |
| region, |
| "an `info:server' cell" |
| + " doesn't contain `:' to separate the `host:port'" |
| + Bytes.pretty(hostport), kv); |
| } |
| host = getIP(new String(hostport, 0, colon)); |
| try { |
| port = parsePortNumber(new String(hostport, colon + 1, |
| hostport.length - colon - 1)); |
| } catch (NumberFormatException e) { |
| throw BrokenMetaException.badKV( |
| region, |
| "an `info:server' cell" + " contains an invalid port: " |
| + e.getMessage() + " in " + Bytes.pretty(hostport), kv); |
| } |
| } |
| // TODO(tsuna): If this is the parent of a split region, there are two |
| // other KVs that could be useful: `info:splitA' and `info:splitB'. |
| // Need to investigate whether we can use those as a hint to update our |
| // regions_cache with the daughter regions of the split. |
| } |
| if (start_key == null) { |
| throw new BrokenMetaException(null, "It didn't contain any" |
| + " `info:regioninfo' cell: " + meta_row); |
| } |
| |
| final byte[] region_name = region.name(); |
| if (host == null) { |
| // When there's no `info:server' cell, it typically means that the |
| // location of this region is about to be updated in META, so we |
| // consider this as an NSRE. |
| invalidateRegionCache(region_name, true, "no longer has it assigned."); |
| return null; |
| } |
| |
| // 1. Record the region -> client mapping. |
| // This won't be "discoverable" until another map points to it, because |
| // at this stage no one knows about this region yet, so another thread |
| // may be looking up that region again while we're in the process of |
| // publishing our findings. |
| final RegionClient client = newClient(host, port); |
| final RegionClient oldclient = region2client.put(region, client); |
| if (client == oldclient) { // We were racing with another thread to |
| return client; // discover this region, we lost the race. |
| } |
| RegionInfo oldregion; |
| int nregions; |
| // If we get a ConnectException immediately when trying to connect to the |
| // RegionServer, Netty delivers a CLOSED ChannelStateEvent from a "boss" |
| // thread while we may still be handling the OPEN event in an NIO thread. |
| // Locking the client prevents it from being able to buffer requests when |
| // this happens. After we release the lock, then it will find it's dead. |
| synchronized (client) { |
| // Don't put any code between here and the next put (see next comment). |
| |
| // 2. Store the region in the sorted map. |
| // This will effectively "publish" the result of our work to other |
| // threads. The window between when the previous `put' becomes visible |
| // to all other threads and when we're done updating the sorted map is |
| // when we may unnecessarily re-lookup the same region again. It's an |
| // acceptable trade-off. We avoid extra synchronization complexity in |
| // exchange of occasional duplicate work (which should be rare anyway). |
| oldregion = regions_cache.put(region_name, region); |
| |
| // 3. Update the reverse mapping created in step 1. |
| // This is done last because it's only used to gracefully handle |
| // disconnections and isn't used for serving. |
| final ArrayList<RegionInfo> regions = client2regions.get(client); |
| synchronized (regions) { |
| regions.add(region); |
| nregions = regions.size(); |
| } |
| } |
| |
| // Don't interleave logging with the operations above, in order to attempt |
| // to reduce the duration of the race windows. |
| LOG.info((oldclient == null ? "Added" : "Replaced") + " client for" |
| + " region " + region + ", which was " |
| + (oldregion == null ? "added to" : "updated in") + " the" |
| + " regions cache. Now we know that " + client + " is hosting " |
| + nregions + " region" + (nregions > 1 ? 's' : "") + '.'); |
| |
| return client; |
| } |
| |
| /** |
| * Invalidates any cached knowledge about the given region. |
| * <p> |
| * This is typically used when a region migrates because of a split or a |
| * migration done by the region load balancer, which causes a |
| * {@link NotServingRegionException}. |
| * <p> |
| * This is package-private so that the low-level {@link RegionClient} can do |
| * the invalidation itself when it gets a {@link NotServingRegionException} |
| * back from a RegionServer. |
| * |
| * @param region_name The name of the region to invalidate in our caches. |
| * @param mark_as_nsred If {@code true}, after removing everything we know |
| * about this region, we'll store a special marker in our META cache |
| * to mark this region as "known to be NSRE'd", so that subsequent |
| * requests to this region will "fail-fast". |
| * @param reason If not {@code null}, will be used to log an INFO message |
| * about the cache invalidation done. |
| */ |
| private void invalidateRegionCache(final byte[] region_name, |
| final boolean mark_as_nsred, final String reason) { |
| if ((region_name == META_REGION_NAME && !has_root) // HBase 0.95+ |
| || region_name == ROOT_REGION) { // HBase <= 0.94 |
| if (reason != null) { |
| LOG.info("Invalidated cache for " + (has_root ? "-ROOT-" : ".META.") |
| + " as " + rootregion + ' ' + reason); |
| } |
| rootregion = null; |
| return; |
| } |
| final RegionInfo oldregion = mark_as_nsred ? regions_cache.put(region_name, |
| new RegionInfo(EMPTY_ARRAY, region_name, EMPTY_ARRAY)) : regions_cache |
| .remove(region_name); |
| final RegionInfo region = (oldregion != null ? oldregion : new RegionInfo( |
| EMPTY_ARRAY, region_name, EMPTY_ARRAY)); |
| final RegionClient client = region2client.remove(region); |
| |
| if (oldregion != null && !Bytes.equals(oldregion.name(), region_name)) { |
| // XXX do we want to just re-add oldregion back? This exposes another |
| // race condition (we re-add it and overwrite yet another region change). |
| LOG.warn("Oops, invalidated the wrong regions cache entry." |
| + " Meant to remove " + Bytes.pretty(region_name) |
| + " but instead removed " + oldregion); |
| } |
| |
| if (client == null) { |
| return; |
| } |
| final ArrayList<RegionInfo> regions = client2regions.get(client); |
| if (regions != null) { |
| // `remove()' on an ArrayList causes an array copy. Should we switch |
| // to a LinkedList instead? |
| synchronized (regions) { |
| regions.remove(region); |
| } |
| } |
| if (reason != null) { |
| LOG.info("Invalidated cache for " + region + " as " + client + ' ' |
| + reason); |
| } |
| } |
| |
| /** |
| * Returns true if this region is known to be NSRE'd and shouldn't be used. |
| * |
| * @see #handleNSRE |
| */ |
| private static boolean knownToBeNSREd(final RegionInfo region) { |
| return region.table() == EMPTY_ARRAY; |
| } |
| |
| /** |
| * Low and high watermarks when buffering RPCs due to an NSRE. |
| * |
| * @see #handleNSRE XXX TODO(tsuna): Don't hardcode. |
| */ |
| private static final short NSRE_LOW_WATERMARK = 1000; |
| private static final short NSRE_HIGH_WATERMARK = 10000; |
| |
| /** Log a message for every N RPCs we buffer due to an NSRE. */ |
| private static final short NSRE_LOG_EVERY = 500; |
| |
| /** |
| * Handles the {@link NotServingRegionException} for the given RPC. |
| * <p> |
| * This code will take ownership of the RPC in the sense that it will become |
| * responsible for re-scheduling the RPC later once the NSRE situation gets |
| * resolved by HBase. |
| * |
| * <h1>NSRE handling logic</h1> |
| * Whenever we get an NSRE for the first time for a particular region, we will |
| * add an entry for this region in the {@link #got_nsre} map. We also replace |
| * the entry for this region in {@link #regions_cache} with a special entry |
| * that indicates that this region is known to be unavailable for now, due to |
| * the NSRE. This entry is said to be special because it belongs to the table |
| * with an empty name (which is otherwise impossible). This way, new RPCs that |
| * are sent out can still hit our local cache instead of requiring a META |
| * lookup and be directly sent to this method so they can be queued to wait |
| * until the NSRE situation is resolved by HBase. |
| * <p> |
| * When we first get an NSRE, we also create a "probe" RPC, the goal of which |
| * is to periodically poke HBase and check whether the NSRE situation was |
| * resolved. The way we poke HBase is to send an "exists" RPC (which is |
| * actually just a "get" RPC that returns true or false instead of returning |
| * any data) for the table / key of the first RPC to trigger the NSRE. As soon |
| * as the probe returns successfully, we know HBase resolved the NSRE |
| * situation and the region is back online. Note that it doesn't matter what |
| * the result of the probe is, the only thing that matters is that the probe |
| * doesn't get NSRE'd. |
| * <p> |
| * Once the probe RPC succeeds, we flush out all the RPCs that are pending for |
| * the region that got NSRE'd. When the probe fails, it's periodically |
| * re-scheduled with an exponential-ish backoff. |
| * <p> |
| * We put a cap on the number of RPCs we'll keep on hold while we wait for the |
| * NSRE to be resolved. Say you have a high throughput application that's |
| * producing 100k write operations per second. Even if it takes HBase just a |
| * second to bring the region back online, the application will have generated |
| * over 100k RPCs before we realize we're good to go. This means the |
| * application can easily run itself out of memory if we let the queue grow |
| * unbounded. To prevent that from happening, the code has a low watermark and |
| * a high watermark on the number of pending RPCs for a particular region. |
| * Once the low watermark is hit, one RPC will be failed with a |
| * {@link PleaseThrottleException}. This is an advisory warning that HBase |
| * isn't keeping up and that the application should slow down its HBase usage |
| * momentarily. After hitting the low watermark, further RPCs that are still |
| * getting NSRE'd on the same region will get buffered again until we hit the |
| * high watermark. Once the high watermark is hit, all subsequent RPCs that |
| * get NSRE'd will immediately fail with a {@link PleaseThrottleException} |
| * (and they will fail-fast). |
| * |
| * @param rpc The RPC that failed or is going to fail with an NSRE. |
| * @param region_name The name of the region this RPC is going to. Obviously, |
| * this method cannot be used for RPCs that aren't targeted at a |
| * particular region. |
| * @param e The exception that caused (or may cause) this RPC to fail. |
| */ |
| void handleNSRE(HBaseRpc rpc, final byte[] region_name, |
| final RecoverableException e) { |
| num_nsre_rpcs.increment(); |
| final boolean can_retry_rpc = !cannotRetryRequest(rpc); |
| boolean known_nsre = true; // We already aware of an NSRE for this region? |
| ArrayList<HBaseRpc> nsred_rpcs = got_nsre.get(region_name); |
| HBaseRpc exists_rpc = null; // Our "probe" RPC. |
| if (nsred_rpcs == null) { // Looks like this could be a new NSRE... |
| final ArrayList<HBaseRpc> newlist = new ArrayList<HBaseRpc>(64); |
| // In HBase 0.95 and up, the exists RPC can't use the empty row key, |
| // which could happen if we were trying to scan from the beginning of |
| // the table. So instead use "\0" as the key. |
| exists_rpc = GetRequest.exists(rpc.table, probeKey(rpc.key)); |
| newlist.add(exists_rpc); |
| if (can_retry_rpc) { |
| newlist.add(rpc); |
| } |
| nsred_rpcs = got_nsre.putIfAbsent(region_name, newlist); |
| if (nsred_rpcs == null) { // We've just put `newlist'. |
| nsred_rpcs = newlist; // => We're the first thread to get |
| known_nsre = false; // the NSRE for this region. |
| } |
| } |
| |
| if (known_nsre) { // Some RPCs seem to already be pending due to this NSRE |
| boolean reject = true; // Should we reject this RPC (too many pending)? |
| int size; // How many RPCs are already pending? |
| |
| synchronized (nsred_rpcs) { |
| size = nsred_rpcs.size(); |
| // If nsred_rpcs is empty, there was a race with another thread which |
| // is executing RetryNSREd.call and that just cleared this array and |
| // removed nsred_rpcs from got_nsre right after we got the reference, |
| // so we need to add it back there, unless another thread already |
| // did it (in which case we're really unlucky and lost 2 races). |
| if (size == 0) { |
| final ArrayList<HBaseRpc> added = got_nsre.putIfAbsent(region_name, |
| nsred_rpcs); |
| if (added == null) { // We've just put `nsred_rpcs'. |
| exists_rpc = GetRequest.exists(rpc.table, probeKey(rpc.key)); |
| nsred_rpcs.add(exists_rpc); // We hold the lock on nsred_rpcs |
| if (can_retry_rpc) { |
| nsred_rpcs.add(rpc); // so we can safely add those 2. |
| } |
| known_nsre = false; // We mistakenly believed it was known. |
| } else { // We lost the second race. |
| // Here we synchronize on two different references without any |
| // apparent ordering guarantee, which can typically lead to |
| // deadlocks. In this case though we're fine, as any other thread |
| // that still has a reference to `nsred_rpcs' is gonna go through |
| // this very same code path and will lock `nsred_rpcs' first |
| // before finding that it too lost 2 races, so it'll lock `added' |
| // second. So there's actually a very implicit ordering. |
| if (can_retry_rpc) { |
| synchronized (added) { // Won't deadlock (explanation above). |
| if (added.isEmpty()) { |
| LOG.error("WTF? Shouldn't happen! Lost 2 races and found" |
| + " an empty list of NSRE'd RPCs (" + added + ") for " |
| + Bytes.pretty(region_name)); |
| exists_rpc = GetRequest.exists(rpc.table, probeKey(rpc.key)); |
| added.add(exists_rpc); |
| } else { |
| exists_rpc = added.get(0); |
| } |
| if (can_retry_rpc) { |
| added.add(rpc); // Add ourselves in the existing array... |
| } |
| } |
| } |
| nsred_rpcs = added; // ... and update our reference. |
| } |
| } |
| // If `rpc' is the first element in nsred_rpcs, it's our "probe" RPC, |
| // in which case we must not add it to the array again. |
| else if ((exists_rpc = nsred_rpcs.get(0)) != rpc) { |
| if (size < NSRE_HIGH_WATERMARK) { |
| if (size == NSRE_LOW_WATERMARK) { |
| nsred_rpcs.add(null); // "Skip" one slot. |
| } else if (can_retry_rpc) { |
| reject = false; |
| if (nsred_rpcs.contains(rpc)) { // XXX O(n) check... :-/ |
| LOG.error("WTF? Trying to add " + rpc + " twice to NSREd RPC" |
| + " on " + Bytes.pretty(region_name)); |
| } else { |
| nsred_rpcs.add(rpc); |
| } |
| } |
| } |
| } else { // This is our probe RPC. |
| reject = false; // So don't reject it. |
| } |
| } // end of the synchronized block. |
| |
| // Stop here if this is a known NSRE and `rpc' is not our probe RPC. |
| if (known_nsre && exists_rpc != rpc) { |
| if (size != NSRE_HIGH_WATERMARK && size % NSRE_LOG_EVERY == 0) { |
| final String msg = "There are now " + size |
| + " RPCs pending due to NSRE on " + Bytes.pretty(region_name); |
| if (size + NSRE_LOG_EVERY < NSRE_HIGH_WATERMARK) { |
| LOG.info(msg); // First message logged at INFO level. |
| } else { |
| LOG.warn(msg); // Last message logged with increased severity. |
| } |
| } |
| if (reject) { |
| rpc.callback(new PleaseThrottleException(size + " RPCs waiting on " |
| + Bytes.pretty(region_name) + " to come back online", e, rpc, |
| exists_rpc.getDeferred())); |
| } |
| return; // This NSRE is already known and being handled. |
| } |
| } |
| |
| num_nsres.increment(); |
| // Mark this region as being NSRE'd in our regions_cache. |
| invalidateRegionCache(region_name, true, (known_nsre ? "still " : "") |
| + "seems to be splitting or closing it."); |
| |
| // Need a `final' variable to access from within the inner class below. |
| final ArrayList<HBaseRpc> rpcs = nsred_rpcs; // Guaranteed non-null. |
| final HBaseRpc probe = exists_rpc; // Guaranteed non-null. |
| nsred_rpcs = null; |
| exists_rpc = null; |
| |
| if (known_nsre && probe.attempt > 1) { |
| // Our probe is almost guaranteed to cause a META lookup, so virtually |
| // every time we retry it its attempt count will be incremented twice |
| // (once for a META lookup, once to send the actual probe). Here we |
| // decrement the attempt count to "de-penalize" the probe from causing |
| // META lookups, because that's what we want it to do. If the probe |
| // is lucky and doesn't trigger a META lookup (rare) it'll get a free |
| // extra attempt, no big deal. |
| probe.attempt--; |
| } else if (!can_retry_rpc) { |
| // `rpc' isn't a probe RPC and can't be retried, make it fail-fast now. |
| rpc.callback(tooManyAttempts(rpc, e)); |
| } |
| |
| rpc = null; // No longer need this reference. |
| |
| // Callback we're going to add on our probe RPC. When this callback gets |
| // invoked, it means that our probe RPC completed, so NSRE situation seems |
| // resolved and we can retry all the RPCs that were waiting on that region. |
| // We also use this callback as an errback to avoid leaking RPCs in case |
| // of an unexpected failure of the probe RPC (e.g. a RegionServer dying |
| // while it's splitting a region, which would cause a connection reset). |
| final class RetryNSREd implements Callback<Object, Object> { |
| public Object call(final Object arg) { |
| if (arg instanceof Exception) { |
| LOG.warn("Probe " + probe + " failed", (Exception) arg); |
| } |
| ArrayList<HBaseRpc> removed = got_nsre.remove(region_name); |
| if (removed != rpcs && removed != null) { // Should never happen. |
| synchronized (removed) { // But just in case... |
| synchronized (rpcs) { |
| LOG.error("WTF? Impossible! Removed the wrong list of RPCs" |
| + " from got_nsre. Was expecting list@" |
| + System.identityHashCode(rpcs) + " (size=" + rpcs.size() |
| + "), got list@" + System.identityHashCode(removed) |
| + " (size=" + removed.size() + ')'); |
| } |
| for (final HBaseRpc r : removed) { |
| if (r != null && r != probe) { |
| sendRpcToRegion(r); // We screwed up but let's not lose RPCs. |
| } |
| } |
| removed.clear(); |
| } |
| } |
| removed = null; |
| |
| synchronized (rpcs) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Retrying " + rpcs.size() + " RPCs now that the NSRE on " |
| + Bytes.pretty(region_name) + " seems to have cleared"); |
| } |
| final Iterator<HBaseRpc> i = rpcs.iterator(); |
| if (i.hasNext()) { |
| HBaseRpc r = i.next(); |
| if (r != probe) { |
| LOG.error("WTF? Impossible! Expected first == probe but first=" |
| + r + " and probe=" + probe); |
| sendRpcToRegion(r); |
| } |
| while (i.hasNext()) { |
| if ((r = i.next()) != null) { |
| sendRpcToRegion(r); |
| } |
| } |
| } else { |
| LOG.error("WTF? Impossible! Empty rpcs array=" + rpcs |
| + " found by " + this); |
| } |
| rpcs.clear(); |
| } |
| |
| return arg; |
| } |
| |
| public String toString() { |
| return "retry other RPCs NSRE'd on " + Bytes.pretty(region_name); |
| } |
| } |
| ; |
| |
| // It'll take a short while for HBase to clear the NSRE. If a |
| // region is being split, we should be able to use it again pretty |
| // quickly, but if a META entry is stale (e.g. due to RegionServer |
| // failures / restarts), it may take up to several seconds. |
| final class NSRETimer implements TimerTask { |
| public void run(final Timeout timeout) { |
| if (probe.attempt == 0) { // Brand new probe. |
| probe.getDeferred().addBoth(new RetryNSREd()); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Done waiting after NSRE on " + Bytes.pretty(region_name) |
| + ", retrying " + probe); |
| } |
| // Make sure the probe will cause a META lookup. |
| invalidateRegionCache(region_name, false, null); |
| sendRpcToRegion(probe); // Restart the RPC. |
| } |
| |
| public String toString() { |
| return "probe NSRE " + probe; |
| } |
| } |
| ; |
| |
| // Linear backoff followed by exponential backoff. Some NSREs can be |
| // resolved in a second or so, some seem to easily take ~6 seconds, |
| // sometimes more when a RegionServer has failed and the master is slowly |
| // splitting its logs and re-assigning its regions. |
| final int wait_ms = probe.attempt < 4 ? 200 * (probe.attempt + 2) // 400, |
| // 600, |
| // 800, |
| // 1000 |
| : 1000 + (1 << probe.attempt); // 1016, 1032, 1064, 1128, 1256, 1512, .. |
| newTimeout(new NSRETimer(), wait_ms); |
| } |
| |
| /** |
| * Some arbitrary junk that is unlikely to appear in a real row key. |
| * |
| * @see probeKey |
| */ |
| private static byte[] PROBE_SUFFIX = { ':', 'A', 's', 'y', 'n', 'c', 'H', |
| 'B', 'a', 's', 'e', '~', 'p', 'r', 'o', 'b', 'e', '~', '<', ';', '_', |
| '<', }; |
| |
| /** |
| * Returns a newly allocated key to probe, to check a region is online. |
| * Sometimes we need to "poke" HBase to see if a region is online or a table |
| * exists. Given a key, we prepend some unique suffix to make it a lot less |
| * likely that we hit a real key with our probe, as doing so might have some |
| * implications on the RegionServer's memory usage. Yes, some people with very |
| * large keys were experiencing OOM's in their RegionServers due to AsyncHBase |
| * probes. |
| */ |
| private static byte[] probeKey(final byte[] key) { |
| final byte[] testKey = new byte[key.length + 64]; |
| System.arraycopy(key, 0, testKey, 0, key.length); |
| System.arraycopy(PROBE_SUFFIX, 0, testKey, testKey.length |
| - PROBE_SUFFIX.length, PROBE_SUFFIX.length); |
| return testKey; |
| } |
| |
| // ----------------------------------------------------------------- // |
| // Code that manages connection and disconnection to Region Servers. // |
| // ----------------------------------------------------------------- // |
| |
| /** |
| * Returns a client to communicate with a Region Server. |
| * <p> |
| * Note that this method is synchronized, so only one client at a time can be |
| * created. In practice this shouldn't be a problem as this method is not |
| * expected to be frequently called. |
| * |
| * @param host The normalized <strong>IP address</strong> of the region |
| * server. Passing a hostname or a denormalized IP address will work |
| * silently and will result in unnecessary extra connections (clients |
| * are cached, which is why always using the normalized IP address |
| * will result in fewer connections). |
| * @param port The port on which the region server is serving. |
| * @return A client for this region server. |
| */ |
| private RegionClient newClient(final String host, final int port) { |
| // This big synchronized block is required because using a |
| // ConcurrentHashMap wouldn't be sufficient. We could still have 2 |
| // threads attempt to create the same client at the same time, and they |
| // could both test the map at the same time and create 2 instances. |
| final String hostport = host + ':' + port; |
| |
| RegionClient client; |
| SocketChannel chan = null; |
| synchronized (ip2client) { |
| client = ip2client.get(hostport); |
| if (client != null && client.isAlive()) { |
| return client; |
| } |
| |
| // We don't use Netty's ClientBootstrap class because it makes it |
| // unnecessarily complicated to have control over which ChannelPipeline |
| // exactly will be given to the channel. It's over-designed. |
| final RegionClientPipeline pipeline = new RegionClientPipeline(); |
| client = pipeline.init(); |
| chan = channel_factory.newChannel(pipeline); |
| ip2client.put(hostport, client); // This is guaranteed to return null. |
| } |
| client2regions.put(client, new ArrayList<RegionInfo>()); |
| num_connections_created.increment(); |
| // Configure and connect the channel without locking ip2client. |
| final SocketChannelConfig config = chan.getConfig(); |
| config.setConnectTimeoutMillis(5000); |
| config.setTcpNoDelay(true); |
| // Unfortunately there is no way to override the keep-alive timeout in |
| // Java since the JRE doesn't expose any way to call setsockopt() with |
| // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh. |
| config.setKeepAlive(true); |
| chan.connect(new InetSocketAddress(host, port)); // Won't block. |
| return client; |
| } |
| |
| /** |
| * A {@link DefaultChannelPipeline} that gives us a chance to deal with |
| * certain events before any handler runs. |
| * <p> |
| * We hook a couple of methods in order to report disconnection events to the |
| * {@link HBaseClient} so that it can clean up its connection caches ASAP to |
| * avoid using disconnected (or soon to be disconnected) sockets. |
| * <p> |
| * Doing it this way is simpler than having a first handler just to handle |
| * disconnection events, to which we'd need to pass a callback to invoke to |
| * report the event back to the {@link HBaseClient}. |
| */ |
| private final class RegionClientPipeline extends DefaultChannelPipeline { |
| |
| /** |
| * Have we already disconnected?. We use this to avoid doing the cleanup |
| * work for the same client more than once, even if we get multiple events |
| * indicating that the client is no longer connected to the RegionServer |
| * (e.g. DISCONNECTED, CLOSED). No synchronization needed as this is always |
| * accessed from only one thread at a time (equivalent to a non-shared state |
| * in a Netty handler). |
| */ |
| private boolean disconnected = false; |
| |
| RegionClientPipeline() { |
| } |
| |
| /** |
| * Initializes this pipeline. This method <strong>MUST</strong> be called on |
| * each new instance before it's used as a pipeline for a channel. |
| */ |
| RegionClient init() { |
| final RegionClient client = new RegionClient(HBaseClient.this); |
| super.addLast("handler", client); |
| return client; |
| } |
| |
| @Override |
| public void sendDownstream(final ChannelEvent event) { |
| // LoggerFactory.getLogger(RegionClientPipeline.class) |
| // .debug("hooked sendDownstream " + event); |
| if (event instanceof ChannelStateEvent) { |
| handleDisconnect((ChannelStateEvent) event); |
| } |
| super.sendDownstream(event); |
| } |
| |
| @Override |
| public void sendUpstream(final ChannelEvent event) { |
| // LoggerFactory.getLogger(RegionClientPipeline.class) |
| // .debug("hooked sendUpstream " + event); |
| if (event instanceof ChannelStateEvent) { |
| handleDisconnect((ChannelStateEvent) event); |
| } |
| super.sendUpstream(event); |
| } |
| |
| private void handleDisconnect(final ChannelStateEvent state_event) { |
| if (disconnected) { |
| return; |
| } |
| switch (state_event.getState()) { |
| case OPEN: |
| if (state_event.getValue() == Boolean.FALSE) { |
| break; // CLOSED |
| } |
| return; |
| case CONNECTED: |
| if (state_event.getValue() == null) { |
| break; // DISCONNECTED |
| } |
| return; |
| default: |
| return; // Not an event we're interested in, ignore it. |
| } |
| |
| disconnected = true; // So we don't clean up the same client twice. |
| try { |
| final RegionClient client = super.get(RegionClient.class); |
| SocketAddress remote = super.getChannel().getRemoteAddress(); |
| // At this point Netty gives us no easy way to access the |
| // SocketAddress of the peer we tried to connect to, so we need to |
| // find which entry in the map was used for the rootregion. This |
| // kinda sucks but I couldn't find an easier way. |
| if (remote == null) { |
| remote = slowSearchClientIP(client); |
| } |
| |
| // Prevent the client from buffering requests while we invalidate |
| // everything we have about it. |
| synchronized (client) { |
| removeClientFromCache(client, remote); |
| } |
| } catch (Exception e) { |
| LoggerFactory.getLogger(RegionClientPipeline.class).error( |
| "Uncaught exception when handling a disconnection of " |
| + getChannel(), e); |
| } |
| } |
| |
| } |
| |
| /** |
| * Performs a slow search of the IP used by the given client. |
| * <p> |
| * This is needed when we're trying to find the IP of the client before its |
| * channel has successfully connected, because Netty's API offers no way of |
| * retrieving the IP of the remote peer until we're connected to it. |
| * |
| * @param client The client we want the IP of. |
| * @return The IP of the client, or {@code null} if we couldn't find it. |
| */ |
| private InetSocketAddress slowSearchClientIP(final RegionClient client) { |
| String hostport = null; |
| synchronized (ip2client) { |
| for (final Map.Entry<String, RegionClient> e : ip2client.entrySet()) { |
| if (e.getValue() == client) { |
| hostport = e.getKey(); |
| break; |
| } |
| } |
| } |
| |
| if (hostport == null) { |
| HashMap<String, RegionClient> copy; |
| synchronized (ip2client) { |
| copy = new HashMap<String, RegionClient>(ip2client); |
| } |
| LOG.error("WTF? Should never happen! Couldn't find " + client + " in " |
| + copy); |
| return null; |
| } |
| |
| LOG.warn("Couldn't connect to the RegionServer @ " + hostport); |
| final int colon = hostport.indexOf(':', 1); |
| if (colon < 1) { |
| LOG.error("WTF? Should never happen! No `:' found in " + hostport); |
| return null; |
| } |
| final String host = getIP(hostport.substring(0, colon)); |
| int port; |
| try { |
| port = parsePortNumber(hostport.substring(colon + 1, hostport.length())); |
| } catch (NumberFormatException e) { |
| LOG.error("WTF? Should never happen! Bad port in " + hostport, e); |
| return null; |
| } |
| return new InetSocketAddress(host, port); |
| } |
| |
| /** |
| * Removes all the cache entries referred to the given client. |
| * |
| * @param client The client for which we must invalidate everything. |
| * @param remote The address of the remote peer, if known, or null. |
| */ |
| private void removeClientFromCache(final RegionClient client, |
| final SocketAddress remote) { |
| if (client == rootregion) { |
| LOG.info("Lost connection with the " + (has_root ? "-ROOT-" : ".META.") |
| + " region"); |
| rootregion = null; |
| } |
| ArrayList<RegionInfo> regions = client2regions.remove(client); |
| if (regions != null) { |
| // Make a copy so we don't need to synchronize on it while iterating. |
| RegionInfo[] regions_copy; |
| synchronized (regions) { |
| regions_copy = regions.toArray(new RegionInfo[regions.size()]); |
| regions = null; |
| // If any other thread still has a reference to `regions', their |
| // updates will be lost (and we don't care). |
| } |
| for (final RegionInfo region : regions_copy) { |
| final byte[] table = region.table(); |
| final byte[] stop_key = region.stopKey(); |
| // If stop_key is the empty array: |
| // This region is the last region for this table. In order to |
| // find the start key of the last region, we append a '\0' byte |
| // at the end of the table name and search for the entry with a |
| // key right before it. |
| // Otherwise: |
| // Search for the entry with a key right before the stop_key. |
| final byte[] search_key = createRegionSearchKey( |
| stop_key.length == 0 ? Arrays.copyOf(table, table.length + 1) |
| : table, stop_key); |
| final Map.Entry<byte[], RegionInfo> entry = regions_cache |
| .lowerEntry(search_key); |
| if (entry != null && entry.getValue() == region) { |
| // Invalidate the regions cache first, as it's the most damaging |
| // one if it contains stale data. |
| regions_cache.remove(entry.getKey()); |
| LOG.debug("Removed from regions cache: {}", region); |
| } |
| final RegionClient oldclient = region2client.remove(region); |
| if (client == oldclient) { |
| LOG.debug("Association removed: {} -> {}", region, client); |
| } else if (oldclient != null) { // Didn't remove what we expected?! |
| LOG.warn("When handling disconnection of " + client |
| + " and removing " + region + " from region2client" |
| + ", it was found that " + oldclient + " was in fact" |
| + " serving this region"); |
| } |
| } |
| } |
| |
| if (remote == null) { |
| return; // Can't continue without knowing the remote address. |
| } |
| |
| String hostport = null; |
| if (remote instanceof InetSocketAddress) { |
| final InetSocketAddress sock = (InetSocketAddress) remote; |
| final InetAddress addr = sock.getAddress(); |
| if (addr == null) { |
| LOG.error("WTF? Unresolved IP for " + remote |
| + ". This shouldn't happen."); |
| return; |
| } else { |
| hostport = addr.getHostAddress() + ':' + sock.getPort(); |
| } |
| } else { |
| LOG.error("WTF? Found a non-InetSocketAddress remote: " + remote |
| + ". This shouldn't happen."); |
| return; |
| } |
| |
| RegionClient old; |
| synchronized (ip2client) { |
| old = ip2client.remove(hostport); |
| } |
| LOG.debug("Removed from IP cache: {} -> {}", hostport, client); |
| if (old == null) { |
| LOG.warn("When expiring " + client + " from the client cache (host:port=" |
| + hostport + "), it was found that there was no entry" |
| + " corresponding to " + remote + ". This shouldn't happen."); |
| } |
| } |
| |
| // ---------------- // |
| // ZooKeeper stuff. // |
| // ---------------- // |
| |
| /** |
| * Helper to locate the -ROOT- region through ZooKeeper. |
| * <p> |
| * We don't watch the file of the -ROOT- region. We just asynchronously read |
| * it once to find -ROOT-, then we close our ZooKeeper session. There are a |
| * few reasons for this. First of all, the -ROOT- region doesn't move often. |
| * When it does, and when we need to use it, we'll realize that -ROOT- is no |
| * longer where we though it was and we'll find it again. Secondly, |
| * maintaining a session open just to watch the -ROOT- region is a waste of |
| * resources both on our side and on ZK's side. ZK is chatty, it will |
| * frequently send us heart beats that will keep waking its event thread, etc. |
| * Third, if the application we're part of already needs to maintain a session |
| * with ZooKeeper anyway, we can't easily share it with them anyway, because |
| * of ZooKeeper's API. Indeed, unfortunately the ZooKeeper API requires that |
| * the {@link ZooKeeper} object be re-created when the session is invalidated |
| * (due to a disconnection or a timeout), which means that it's impossible to |
| * share the {@link ZooKeeper} object. Ideally in an application there should |
| * be only one instance, but their poor API makes it impractical, since the |
| * instance must be re-created when the session is invalidated, which entails |
| * that one entity should own the reconnection process and have a way of |
| * giving everyone else the new instance. This is extremely cumbersome so I |
| * don't expect anyone to do this, which is why we manage our own instance. |
| */ |
| private final class ZKClient implements Watcher { |
| |
| /** The specification of the quorum, e.g. "host1,host2,host3" */ |
| private final String quorum_spec; |
| |
| /** The base path under which is the znode for the -ROOT- region. */ |
| private final String base_path; |
| |
| /** |
| * Our ZooKeeper instance. Must grab this' monitor before accessing. |
| */ |
| private ZooKeeper zk; |
| |
| /** |
| * When we're not connected to ZK, users who are trying to access the -ROOT- |
| * region can queue up here to be called back when it's available. Must grab |
| * this' monitor before accessing. |
| */ |
| private ArrayList<Deferred<Object>> deferred_rootregion; |
| |
| /** |
| * Constructor. |
| * |
| * @param quorum_spec The specification of the quorum, e.g. |
| * {@code "host1,host2,host3"}. |
| * @param base_path The base path under which is the znode for the -ROOT- |
| * region. |
| */ |
| public ZKClient(final String quorum_spec, final String base_path) { |
| this.quorum_spec = quorum_spec; |
| this.base_path = base_path; |
| } |
| |
| /** |
| * Returns a deferred that will be called back once we found -ROOT-. |
| * |
| * @return A deferred which will be invoked with an unspecified argument |
| * once we know where -ROOT- is. Note that by the time you get |
| * called back, we may have lost the connection to the -ROOT- region |
| * again. |
| */ |
| public Deferred<Object> getDeferredRoot() { |
| final Deferred<Object> d = new Deferred<Object>(); |
| synchronized (this) { |
| try { |
| connectZK(); // Kick off a connection if needed. |
| if (deferred_rootregion == null) { |
| LOG.info("Need to find the " + (has_root ? "-ROOT-" : ".META.") |
| + " region"); |
| deferred_rootregion = new ArrayList<Deferred<Object>>(); |
| } |
| deferred_rootregion.add(d); |
| } catch (NonRecoverableException e) { |
| LOG.error(e.getMessage(), e.getCause()); |
| d.callback(e); |
| } |
| } |
| return d; |
| } |
| |
| /** |
| * Like {@link getDeferredRoot} but returns null if we're not already trying |
| * to find -ROOT-. In other words calling this method doesn't trigger a |
| * -ROOT- lookup unless there's already one in flight. |
| * |
| * @return @{code null} if -ROOT- isn't being looked up right now, otherwise |
| * a deferred which will be invoked with an unspecified argument |
| * once we know where -ROOT- is. Note that by the time you get |
| * called back, we may have lost the connection to the -ROOT- region |
| * again. |
| */ |
| Deferred<Object> getDeferredRootIfBeingLookedUp() { |
| synchronized (this) { |
| if (deferred_rootregion == null) { |
| return null; |
| } |
| final Deferred<Object> d = new Deferred<Object>(); |
| deferred_rootregion.add(d); |
| return d; |
| } |
| } |
| |
| /** |
| * Atomically returns and {@code null}s out the current list of Deferreds |
| * waiting for the -ROOT- region. |
| */ |
| private ArrayList<Deferred<Object>> atomicGetAndRemoveWaiters() { |
| synchronized (this) { |
| try { |
| return deferred_rootregion; |
| } finally { |
| deferred_rootregion = null; |
| } |
| } |
| } |
| |
| /** |
| * Processes a ZooKeeper event. |
| * <p> |
| * This method is called back by {@link ZooKeeper} from its main event |
| * thread. So make sure you don't block. |
| * |
| * @param event The event to process. |
| */ |
| public void process(final WatchedEvent event) { |
| LOG.debug("Got ZooKeeper event: {}", event); |
| try { |
| switch (event.getState()) { |
| case SyncConnected: |
| getRootRegion(); |
| break; |
| default: |
| disconnectZK(); |
| // Reconnect only if we're still trying to locate -ROOT-. |
| synchronized (this) { |
| if (deferred_rootregion != null) { |
| LOG.warn("No longer connected to ZooKeeper, event=" + event); |
| connectZK(); |
| } |
| } |
| return; |
| } |
| } catch (Exception e) { |
| LOG.error("Uncaught exception when handling event " + event, e); |
| return; |
| } |
| LOG.debug("Done handling ZooKeeper event: {}", event); |
| } |
| |
| /** |
| * Connects to ZooKeeper. |
| * |
| * @throws NonRecoverableException if something from which we can't recover |
| * happened -- e.g. us being unable to resolve the hostname of any |
| * of the zookeeper servers. |
| */ |
| private void connectZK() { |
| try { |
| // Session establishment is asynchronous, so this won't block. |
| synchronized (this) { |
| if (zk != null) { // Already connected. |
| return; |
| } |
| zk = new ZooKeeper(quorum_spec, 5000, this); |
| } |
| } catch (UnknownHostException e) { |
| // No need to retry, we usually cannot recover from this. |
| throw new NonRecoverableException("Cannot connect to ZooKeeper," |
| + " is the quorum specification valid? " + quorum_spec, e); |
| } catch (IOException e) { |
| LOG.error("Failed to connect to ZooKeeper", e); |
| // XXX don't retry recursively, create a timer with an exponential |
| // backoff and schedule the reconnection attempt for later. |
| connectZK(); |
| } |
| } |
| |
| /** |
| * Disconnects from ZooKeeper. |
| * <p> |
| * <strong>This method is blocking.</strong> Unfortunately, ZooKeeper |
| * doesn't offer an asynchronous API to close a session at this time. It |
| * waits until the server responds to the {@code closeSession} RPC. |
| */ |
| public void disconnectZK() { |
| synchronized (this) { |
| if (zk == null) { |
| return; |
| } |
| try { |
| // I'm not sure but I think both the client and the server race to |
| // close the socket, which often causes the DEBUG spam: |
| // java.net.SocketException: Socket is not connected |
| // When the client attempts to close its socket after its OS and |
| // JVM are done processing the TCP FIN and it's already closed. |
| LOG.debug("Ignore any DEBUG exception from ZooKeeper"); |
| final long start = System.nanoTime(); |
| zk.close(); |
| LOG.debug("ZooKeeper#close completed in {}ns", System.nanoTime() |
| - start); |
| } catch (InterruptedException e) { |
| // The signature of the method pretends that it can throw an |
| // InterruptedException, but this is a lie, the code of that |
| // method will never throw this type of exception. |
| LOG.error("Should never happen", e); |
| } |
| zk = null; |
| } |
| } |
| |
| /** Schedule a timer to retry {@link #getRootRegion} after some time. */ |
| private void retryGetRootRegionLater() { |
| newTimeout(new TimerTask() { |
| public void run(final Timeout timeout) { |
| if (!getRootRegion()) { // Try to read the znodes |
| connectZK(); // unless we need to connect first. |
| } |
| } |
| }, 1000 /* milliseconds */); |
| } |
| |
| /** |
| * Puts a watch in ZooKeeper to monitor the file of the -ROOT- region. This |
| * method just registers an asynchronous callback. |
| */ |
| final class ZKCallback implements AsyncCallback.DataCallback { |
| |
| /** |
| * HBASE-3065 (r1151751) prepends meta-data in ZooKeeper files. The |
| * meta-data always starts with this magic byte. |
| */ |
| private static final byte MAGIC = (byte) 0xFF; |
| |
| private static final byte UNKNOWN = 0; // Callback still pending. |
| private static final byte FOUND = 1; // We found the znode. |
| private static final byte NOTFOUND = 2; // The znode didn't exist. |
| |
| private byte found_root; |
| private byte found_meta; // HBase 0.95 and up |
| |
| public void processResult(final int rc, final String path, |
| final Object ctx, final byte[] data, final Stat stat) { |
| final boolean is_root; // True if ROOT znode, false if META znode. |
| if (path.endsWith("/root-region-server")) { |
| is_root = true; |
| } else if (path.endsWith("/meta-region-server")) { |
| is_root = false; |
| } else { |
| LOG.error("WTF? We got a callback from ZooKeeper for a znode we did" |
| + " not expect: " + path + " / stat: " + stat + " / data: " |
| + Bytes.pretty(data)); |
| retryGetRootRegionLater(); |
| return; |
| } |
| |
| if (rc == Code.NONODE.intValue()) { |
| final boolean both_znode_failed; |
| if (is_root) { |
| found_root = NOTFOUND; |
| both_znode_failed = found_meta == NOTFOUND; |
| } else { // META (HBase 0.95 and up) |
| found_meta = NOTFOUND; |
| both_znode_failed = found_root == NOTFOUND; |
| } |
| if (both_znode_failed) { |
| LOG.error("The znode for the -ROOT- region doesn't exist!"); |
| retryGetRootRegionLater(); |
| } |
| return; |
| } else if (rc != Code.OK.intValue()) { |
| LOG.error("Looks like our ZK session expired or is broken, rc=" + rc |
| + ": " + Code.get(rc)); |
| disconnectZK(); |
| connectZK(); |
| return; |
| } |
| if (data == null || data.length == 0 || data.length > Short.MAX_VALUE) { |
| LOG.error("The location of the -ROOT- region in ZooKeeper is " |
| + (data == null || data.length == 0 ? "empty" : "too large (" |
| + data.length + " bytes!)")); |
| retryGetRootRegionLater(); |
| return; // TODO(tsuna): Add a watch to wait until the file changes. |
| } |
| |
| final RegionClient client; |
| if (is_root) { |
| found_root = FOUND; |
| client = handleRootZnode(data); |
| } else { // META (HBase 0.95 and up) |
| found_meta = FOUND; |
| client = handleMetaZnode(data); |
| } |
| |
| if (client == null) { // We failed to get a client. |
| retryGetRootRegionLater(); // So retry later. |
| return; |
| } |
| |
| final ArrayList<Deferred<Object>> ds = atomicGetAndRemoveWaiters(); |
| if (ds != null) { |
| for (final Deferred<Object> d : ds) { |
| d.callback(client); |
| } |
| } |
| |
| disconnectZK(); |
| // By the time we're done, we may need to find -ROOT- again. So |
| // check to see if there are people waiting to find it again, and if |
| // there are, re-open a new session with ZK. |
| // TODO(tsuna): This typically happens when the address of -ROOT- in |
| // ZK is stale. In this case, we should setup a watch to get |
| // notified once the znode gets updated, instead of continuously |
| // polling ZK and creating new sessions. |
| synchronized (ZKClient.this) { |
| if (deferred_rootregion != null) { |
| connectZK(); |
| } |
| } |
| } |
| |
| /** Returns a new client for the RS found in the root-region-server. */ |
| @SuppressWarnings("fallthrough") |
| private RegionClient handleRootZnode(final byte[] data) { |
| // There are 3 cases. Older versions of HBase encode the location |
| // of the root region as "host:port", 0.91 uses "host,port,startcode" |
| // and newer versions of 0.91 use "<metadata>host,port,startcode" |
| // where the <metadata> starts with MAGIC, then a 4 byte integer, |
| // then that many bytes of meta data. |
| boolean newstyle; // True if we expect a 0.91 style location. |
| final short offset; // Bytes to skip at the beginning of data. |
| short firstsep = -1; // Index of the first separator (':' or ','). |
| if (data[0] == MAGIC) { |
| newstyle = true; |
| final int metadata_length = Bytes.getInt(data, 1); |
| if (metadata_length < 1 || metadata_length > 65000) { |
| LOG.error("Malformed meta-data in " + Bytes.pretty(data) |
| + ", invalid metadata length=" + metadata_length); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| offset = (short) (1 + 4 + metadata_length); |
| } else { |
| newstyle = false; // Maybe true, the loop below will tell us. |
| offset = 0; |
| } |
| final short n = (short) data.length; |
| // Look for the first separator. Skip the offset, and skip the |
| // first byte, because we know the separate can only come after |
| // at least one byte. |
| loop: for (short i = (short) (offset + 1); i < n; i++) { |
| switch (data[i]) { |
| case ',': |
| newstyle = true; |
| /* fall through */ |
| case ':': |
| firstsep = i; |
| break loop; |
| } |
| } |
| if (firstsep == -1) { |
| LOG.error("-ROOT- location doesn't contain a separator" |
| + " (':' or ','): " + Bytes.pretty(data)); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| final String host; |
| final short portend; // Index past where the port number ends. |
| if (newstyle) { |
| host = new String(data, offset, firstsep - offset); |
| short i; |
| for (i = (short) (firstsep + 2); i < n; i++) { |
| if (data[i] == ',') { |
| break; |
| } |
| } |
| portend = i; // Port ends on the comma. |
| } else { |
| host = new String(data, 0, firstsep); |
| portend = n; // Port ends at the end of the array. |
| } |
| final int port = parsePortNumber(new String(data, firstsep + 1, portend |
| - firstsep - 1)); |
| final String ip = getIP(host); |
| if (ip == null) { |
| LOG.error("Couldn't resolve the IP of the -ROOT- region from " + host |
| + " in \"" + Bytes.pretty(data) + '"'); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| LOG.info("Connecting to -ROOT- region @ " + ip + ':' + port); |
| has_root = true; |
| final RegionClient client = rootregion = newClient(ip, port); |
| return client; |
| } |
| |
| /** |
| * Returns a new client for the RS found in the meta-region-server. This |
| * is used in HBase 0.95 and up. |
| */ |
| private RegionClient handleMetaZnode(final byte[] data) { |
| if (data[0] != MAGIC) { |
| LOG.error("Malformed META region meta-data in " + Bytes.pretty(data) |
| + ", invalid leading magic number: " + data[0]); |
| return null; |
| } |
| |
| final int metadata_length = Bytes.getInt(data, 1); |
| if (metadata_length < 1 || metadata_length > 65000) { |
| LOG.error("Malformed META region meta-data in " + Bytes.pretty(data) |
| + ", invalid metadata length=" + metadata_length); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| short offset = (short) (1 + 4 + metadata_length); |
| |
| final int pbuf_magic = Bytes.getInt(data, offset); |
| if (pbuf_magic != PBUF_MAGIC) { |
| LOG.error("Malformed META region meta-data in " + Bytes.pretty(data) |
| + ", invalid magic number=" + pbuf_magic); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| offset += 4; |
| |
| final String ip; |
| final int port; |
| try { |
| final ZooKeeperPB.MetaRegionServer meta = ZooKeeperPB.MetaRegionServer |
| .newBuilder().mergeFrom(data, offset, data.length - offset) |
| .build(); |
| ip = getIP(meta.getServer().getHostName()); |
| port = meta.getServer().getPort(); |
| } catch (InvalidProtocolBufferException e) { |
| LOG.error("Failed to parse the protobuf in " + Bytes.pretty(data), e); |
| return null; // TODO(tsuna): Add a watch to wait until the file |
| // changes. |
| } |
| |
| LOG.info("Connecting to .META. region @ " + ip + ':' + port); |
| has_root = false; |
| final RegionClient client = rootregion = newClient(ip, port); |
| return client; |
| } |
| |
| } |
| |
| /** |
| * Attempts to lookup the ROOT region (or META, if 0.95 and up). |
| * |
| * @return true if a lookup was kicked off, false if not because we weren't |
| * connected to ZooKeeper. |
| */ |
| private boolean getRootRegion() { |
| synchronized (this) { |
| if (zk != null) { |
| LOG.debug("Finding the -ROOT- or .META. region in ZooKeeper"); |
| final ZKCallback cb = new ZKCallback(); |
| zk.getData(base_path + "/root-region-server", this, cb, null); |
| zk.getData(base_path + "/meta-region-server", this, cb, null); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| } |
| |
| // --------------- // |
| // Little helpers. // |
| // --------------- // |
| |
| /** |
| * Gets a hostname or an IP address and returns the textual representation of |
| * the IP address. |
| * <p> |
| * <strong>This method can block</strong> as there is no API for asynchronous |
| * DNS resolution in the JDK. |
| * |
| * @param host The hostname to resolve. |
| * @return The IP address associated with the given hostname, or {@code null} |
| * if the address couldn't be resolved. |
| */ |
| private static String getIP(final String host) { |
| final long start = System.nanoTime(); |
| try { |
| final String ip = InetAddress.getByName(host).getHostAddress(); |
| final long latency = System.nanoTime() - start; |
| if (latency > 500000/* ns */&& LOG.isDebugEnabled()) { |
| LOG.debug("Resolved IP of `" + host + "' to " + ip + " in " + latency |
| + "ns"); |
| } else if (latency >= 3000000/* ns */) { |
| LOG.warn("Slow DNS lookup! Resolved IP of `" + host + "' to " + ip |
| + " in " + latency + "ns"); |
| } |
| return ip; |
| } catch (UnknownHostException e) { |
| LOG.error("Failed to resolve the IP of `" + host + "' in " |
| + (System.nanoTime() - start) + "ns"); |
| return null; |
| } |
| } |
| |
| /** |
| * Parses a TCP port number from a string. |
| * |
| * @param portnum The string to parse. |
| * @return A strictly positive, validated port number. |
| * @throws NumberFormatException if the string couldn't be parsed as an |
| * integer or if the value was outside of the range allowed for TCP |
| * ports. |
| */ |
| private static int parsePortNumber(final String portnum) |
| throws NumberFormatException { |
| final int port = Integer.parseInt(portnum); |
| if (port <= 0 || port > 65535) { |
| throw new NumberFormatException(port == 0 ? "port is zero" |
| : (port < 0 ? "port is negative: " : "port is too large: ") + port); |
| } |
| return port; |
| } |
| |
| } |