| /** |
| * Copyright 2008 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.io.UnsupportedEncodingException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.regex.Pattern; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.ColumnNameParseException; |
| import org.apache.hadoop.hbase.DroppedSnapshotException; |
| import org.apache.hadoop.hbase.HBaseConfiguration; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HStoreKey; |
| import org.apache.hadoop.hbase.HTableDescriptor; |
| import org.apache.hadoop.hbase.NotServingRegionException; |
| import org.apache.hadoop.hbase.RegionHistorian; |
| import org.apache.hadoop.hbase.filter.RowFilterInterface; |
| import org.apache.hadoop.hbase.io.BatchOperation; |
| import org.apache.hadoop.hbase.io.BatchUpdate; |
| import org.apache.hadoop.hbase.io.Cell; |
| import org.apache.hadoop.hbase.io.HbaseMapWritable; |
| import org.apache.hadoop.hbase.io.Reference; |
| import org.apache.hadoop.hbase.io.RowResult; |
| import org.apache.hadoop.hbase.ipc.HRegionInterface; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.FSUtils; |
| import org.apache.hadoop.hbase.util.Writables; |
| import org.apache.hadoop.io.WritableUtils; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.StringUtils; |
| |
| /** |
| * HRegion stores data for a certain region of a table. It stores all columns |
| * for each row. A given table consists of one or more HRegions. |
| * |
| * <p>We maintain multiple HStores for a single HRegion. |
| * |
| * <p>An HStore is a set of rows with some column data; together, |
| * they make up all the data for the rows. |
| * |
| * <p>Each HRegion has a 'startKey' and 'endKey'. |
| * <p>The first is inclusive, the second is exclusive (except for |
| * the final region) The endKey of region 0 is the same as |
| * startKey for region 1 (if it exists). The startKey for the |
| * first region is null. The endKey for the final region is null. |
| * |
| * <p>Locking at the HRegion level serves only one purpose: preventing the |
| * region from being closed (and consequently split) while other operations |
| * are ongoing. Each row level operation obtains both a row lock and a region |
| * read lock for the duration of the operation. While a scanner is being |
| * constructed, getScanner holds a read lock. If the scanner is successfully |
| * constructed, it holds a read lock until it is closed. A close takes out a |
| * write lock and consequently will block for ongoing operations and will block |
| * new operations from starting while the close is in progress. |
| * |
| * <p>An HRegion is defined by its table and its key extent. |
| * |
| * <p>It consists of at least one HStore. The number of HStores should be |
| * configurable, so that data which is accessed together is stored in the same |
| * HStore. Right now, we approximate that by building a single HStore for |
| * each column family. (This config info will be communicated via the |
| * tabledesc.) |
| * |
| * <p>The HTableDescriptor contains metainfo about the HRegion's table. |
| * regionName is a unique identifier for this HRegion. (startKey, endKey] |
| * defines the keyspace for this HRegion. |
| */ |
| public class HRegion implements HConstants { |
| static final String SPLITDIR = "splits"; |
| static final String MERGEDIR = "merges"; |
| static final Random rand = new Random(); |
| static final Log LOG = LogFactory.getLog(HRegion.class); |
| final AtomicBoolean closed = new AtomicBoolean(false); |
| /* Closing can take some time; use the closing flag if there is stuff we don't want |
| * to do while in closing state; e.g. like offer this region up to the master as a region |
| * to close if the carrying regionserver is overloaded. Once set, it is never cleared. |
| */ |
| private final AtomicBoolean closing = new AtomicBoolean(false); |
| private final RegionHistorian historian; |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Members |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| private final Map<Integer, byte []> locksToRows = |
| new ConcurrentHashMap<Integer, byte []>(); |
| private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns = |
| new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>(); |
| // Default access because read by tests. |
| protected final Map<Integer, HStore> stores = |
| new ConcurrentHashMap<Integer, HStore>(); |
| final AtomicLong memcacheSize = new AtomicLong(0); |
| |
| final Path basedir; |
| final HLog log; |
| final FileSystem fs; |
| final HBaseConfiguration conf; |
| final HRegionInfo regionInfo; |
| final Path regiondir; |
| private final Path regionCompactionDir; |
| |
| /* |
| * Set this when scheduling compaction if want the next compaction to be a |
| * major compaction. Cleared each time through compaction code. |
| */ |
| private volatile boolean forceMajorCompaction = false; |
| |
| /** |
| * @return True if this region has references. |
| */ |
| boolean hasReferences() { |
| for (Map.Entry<Integer, HStore> e: this.stores.entrySet()) { |
| for (Map.Entry<Long, HStoreFile> ee: |
| e.getValue().getStorefiles().entrySet()) { |
| // Found a reference, return. |
| if (ee.getValue().isReference()) return true; |
| } |
| } |
| return false; |
| } |
| |
| /* |
| * Data structure of write state flags used coordinating flushes, |
| * compactions and closes. |
| */ |
| static class WriteState { |
| // Set while a memcache flush is happening. |
| volatile boolean flushing = false; |
| // Set when a flush has been requested. |
| volatile boolean flushRequested = false; |
| // Set while a compaction is running. |
| volatile boolean compacting = false; |
| // Gets set in close. If set, cannot compact or flush again. |
| volatile boolean writesEnabled = true; |
| // Set if region is read-only |
| private volatile boolean readOnly = false; |
| |
| /** |
| * Set flags that make this region read-only. |
| */ |
| synchronized void setReadOnly(final boolean onOff) { |
| this.writesEnabled = !onOff; |
| this.readOnly = onOff; |
| } |
| |
| boolean isReadOnly() { |
| return this.readOnly; |
| } |
| |
| boolean isFlushRequested() { |
| return this.flushRequested; |
| } |
| } |
| |
| private volatile WriteState writestate = new WriteState(); |
| |
| final int memcacheFlushSize; |
| private volatile long lastFlushTime; |
| final FlushRequester flushListener; |
| private final int blockingMemcacheSize; |
| final long threadWakeFrequency; |
| // Used to guard splits and closes |
| private final ReentrantReadWriteLock splitsAndClosesLock = |
| new ReentrantReadWriteLock(); |
| private final ReentrantReadWriteLock newScannerLock = |
| new ReentrantReadWriteLock(); |
| |
| // Stop updates lock |
| private final ReentrantReadWriteLock updatesLock = |
| new ReentrantReadWriteLock(); |
| private final Integer splitLock = new Integer(0); |
| private long minSequenceId; |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Constructor |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * HRegion constructor. |
| * |
| * @param basedir qualified path of directory where region should be located, |
| * usually the table directory. |
| * @param log The HLog is the outbound log for any updates to the HRegion |
| * (There's a single HLog for all the HRegions on a single HRegionServer.) |
| * The log file is a logfile from the previous execution that's |
| * custom-computed for this HRegion. The HRegionServer computes and sorts the |
| * appropriate log info for this HRegion. If there is a previous log file |
| * (implying that the HRegion has been written-to before), then read it from |
| * the supplied path. |
| * @param fs is the filesystem. |
| * @param conf is global configuration settings. |
| * @param regionInfo - HRegionInfo that describes the region |
| * is new), then read them from the supplied path. |
| * @param flushListener an object that implements CacheFlushListener or null |
| * making progress to master -- otherwise master might think region deploy |
| * failed. Can be null. |
| */ |
| public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, |
| HRegionInfo regionInfo, FlushRequester flushListener) { |
| this.basedir = basedir; |
| this.log = log; |
| this.fs = fs; |
| this.conf = conf; |
| this.regionInfo = regionInfo; |
| this.flushListener = flushListener; |
| this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); |
| String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName()); |
| this.regiondir = new Path(basedir, encodedNameStr); |
| this.historian = RegionHistorian.getInstance(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Opening region " + this + "/" + |
| this.regionInfo.getEncodedName()); |
| } |
| |
| this.regionCompactionDir = |
| new Path(getCompactionDir(basedir), encodedNameStr); |
| |
| int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize(); |
| if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) { |
| flushSize = conf.getInt("hbase.hregion.memcache.flush.size", |
| HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE); |
| } |
| this.memcacheFlushSize = flushSize; |
| |
| this.blockingMemcacheSize = this.memcacheFlushSize * |
| conf.getInt("hbase.hregion.memcache.block.multiplier", 1); |
| } |
| |
| /** Initialize this region and get it ready to roll. |
| * |
| * @param initialFiles |
| * @param reporter |
| * @throws IOException |
| */ |
| public void initialize( Path initialFiles, |
| final Progressable reporter) throws IOException { |
| Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); |
| |
| // Move prefab HStore files into place (if any). This picks up split files |
| // and any merges from splits and merges dirs. |
| if (initialFiles != null && fs.exists(initialFiles)) { |
| fs.rename(initialFiles, this.regiondir); |
| } |
| |
| // Load in all the HStores. |
| long maxSeqId = -1; |
| long minSeqId = Integer.MAX_VALUE; |
| for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { |
| HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter); |
| stores.put(Bytes.mapKey(c.getName()), store); |
| long storeSeqId = store.getMaxSequenceId(); |
| if (storeSeqId > maxSeqId) { |
| maxSeqId = storeSeqId; |
| } |
| if (storeSeqId < minSeqId) { |
| minSeqId = storeSeqId; |
| } |
| } |
| |
| doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter); |
| |
| if (fs.exists(oldLogFile)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Deleting old log file: " + oldLogFile); |
| } |
| fs.delete(oldLogFile, false); |
| } |
| |
| // Add one to the current maximum sequence id so new edits are beyond. |
| this.minSequenceId = maxSeqId + 1; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Next sequence id for region " + |
| Bytes.toString(regionInfo.getRegionName()) + " is " + |
| this.minSequenceId); |
| } |
| |
| // Get rid of any splits or merges that were lost in-progress |
| Path splits = new Path(regiondir, SPLITDIR); |
| if (fs.exists(splits)) { |
| fs.delete(splits, true); |
| } |
| Path merges = new Path(regiondir, MERGEDIR); |
| if (fs.exists(merges)) { |
| fs.delete(merges, true); |
| } |
| // See if region is meant to run read-only. |
| if (this.regionInfo.getTableDesc().isReadOnly()) { |
| this.writestate.setReadOnly(true); |
| } |
| |
| // HRegion is ready to go! |
| this.writestate.compacting = false; |
| this.lastFlushTime = System.currentTimeMillis(); |
| LOG.info("region " + this + "/" + this.regionInfo.getEncodedName() + |
| " available"); |
| } |
| |
| /** |
| * @return Updates to this region need to have a sequence id that is >= to |
| * the this number. |
| */ |
| long getMinSequenceId() { |
| return this.minSequenceId; |
| } |
| |
| /** @return a HRegionInfo object for this region */ |
| public HRegionInfo getRegionInfo() { |
| return this.regionInfo; |
| } |
| |
| /** @return true if region is closed */ |
| public boolean isClosed() { |
| return this.closed.get(); |
| } |
| |
| /** |
| * @return True if closing process has started. |
| */ |
| public boolean isClosing() { |
| return this.closing.get(); |
| } |
| |
| /** |
| * Close down this HRegion. Flush the cache, shut down each HStore, don't |
| * service any more calls. |
| * |
| * <p>This method could take some time to execute, so don't call it from a |
| * time-sensitive thread. |
| * |
| * @return Vector of all the storage files that the HRegion's component |
| * HStores make use of. It's a list of all HStoreFile objects. Returns empty |
| * vector if already closed and null if judged that it should not close. |
| * |
| * @throws IOException |
| */ |
| public List<HStoreFile> close() throws IOException { |
| return close(false); |
| } |
| |
| /** |
| * Close down this HRegion. Flush the cache unless abort parameter is true, |
| * Shut down each HStore, don't service any more calls. |
| * |
| * This method could take some time to execute, so don't call it from a |
| * time-sensitive thread. |
| * |
| * @param abort true if server is aborting (only during testing) |
| * @return Vector of all the storage files that the HRegion's component |
| * HStores make use of. It's a list of HStoreFile objects. Can be null if |
| * we are not to close at this time or we are already closed. |
| * |
| * @throws IOException |
| */ |
| public List<HStoreFile> close(final boolean abort) throws IOException { |
| |
| if (isClosed()) { |
| LOG.warn("region " + this + " already closed"); |
| return null; |
| } |
| this.closing.set(true); |
| synchronized (splitLock) { |
| synchronized (writestate) { |
| // Disable compacting and flushing by background threads for this |
| // region. |
| writestate.writesEnabled = false; |
| LOG.debug("Closing " + this + ": compactions & flushes disabled "); |
| while (writestate.compacting || writestate.flushing) { |
| LOG.debug("waiting for" + |
| (writestate.compacting ? " compaction" : "") + |
| (writestate.flushing ? |
| (writestate.compacting ? "," : "") + " cache flush" : |
| "") + " to complete for region " + this); |
| try { |
| writestate.wait(); |
| } catch (InterruptedException iex) { |
| // continue |
| } |
| } |
| } |
| newScannerLock.writeLock().lock(); |
| try { |
| splitsAndClosesLock.writeLock().lock(); |
| LOG.debug("Updates disabled for region, no outstanding scanners on " + |
| this); |
| try { |
| // Write lock means no more row locks can be given out. Wait on |
| // outstanding row locks to come in before we close so we do not drop |
| // outstanding updates. |
| waitOnRowLocks(); |
| LOG.debug("No more row locks outstanding on region " + this); |
| |
| // Don't flush the cache if we are aborting |
| if (!abort) { |
| internalFlushcache(); |
| } |
| |
| List<HStoreFile> result = new ArrayList<HStoreFile>(); |
| for (HStore store: stores.values()) { |
| result.addAll(store.close()); |
| } |
| this.closed.set(true); |
| LOG.info("Closed " + this); |
| return result; |
| } finally { |
| splitsAndClosesLock.writeLock().unlock(); |
| } |
| } finally { |
| newScannerLock.writeLock().unlock(); |
| } |
| } |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // HRegion accessors |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** @return start key for region */ |
| public byte [] getStartKey() { |
| return this.regionInfo.getStartKey(); |
| } |
| |
| /** @return end key for region */ |
| public byte [] getEndKey() { |
| return this.regionInfo.getEndKey(); |
| } |
| |
| /** @return region id */ |
| public long getRegionId() { |
| return this.regionInfo.getRegionId(); |
| } |
| |
| /** @return region name */ |
| public byte [] getRegionName() { |
| return this.regionInfo.getRegionName(); |
| } |
| |
| /** @return HTableDescriptor for this region */ |
| public HTableDescriptor getTableDesc() { |
| return this.regionInfo.getTableDesc(); |
| } |
| |
| /** @return HLog in use for this region */ |
| public HLog getLog() { |
| return this.log; |
| } |
| |
| /** @return Configuration object */ |
| public HBaseConfiguration getConf() { |
| return this.conf; |
| } |
| |
| /** @return region directory Path */ |
| public Path getRegionDir() { |
| return this.regiondir; |
| } |
| |
| /** @return FileSystem being used by this region */ |
| public FileSystem getFilesystem() { |
| return this.fs; |
| } |
| |
| /** @return the last time the region was flushed */ |
| public long getLastFlushTime() { |
| return this.lastFlushTime; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // HRegion maintenance. |
| // |
| // These methods are meant to be called periodically by the HRegionServer for |
| // upkeep. |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** @return returns size of largest HStore. */ |
| public long getLargestHStoreSize() { |
| long size = 0; |
| for (HStore h: stores.values()) { |
| long storeSize = h.getSize(); |
| if (storeSize > size) { |
| size = storeSize; |
| } |
| } |
| return size; |
| } |
| |
| /* |
| * Split the HRegion to create two brand-new ones. This also closes |
| * current HRegion. Split should be fast since we don't rewrite store files |
| * but instead create new 'reference' store files that read off the top and |
| * bottom ranges of parent store files. |
| * @param midKey key on which to split region |
| * @return two brand-new (and open) HRegions or null if a split is not needed |
| * @throws IOException |
| */ |
| HRegion[] splitRegion(final byte [] midKey) throws IOException { |
| prepareToSplit(); |
| synchronized (splitLock) { |
| if (closed.get()) { |
| return null; |
| } |
| // Add start/end key checking: hbase-428. |
| byte [] startKey = this.regionInfo.getStartKey(); |
| byte [] endKey = this.regionInfo.getEndKey(); |
| if (HStoreKey.equalsTwoRowKeys(this.regionInfo,startKey, midKey)) { |
| LOG.debug("Startkey and midkey are same, not splitting"); |
| return null; |
| } |
| if (HStoreKey.equalsTwoRowKeys(this.regionInfo,midKey, endKey)) { |
| LOG.debug("Endkey and midkey are same, not splitting"); |
| return null; |
| } |
| LOG.info("Starting split of region " + this); |
| Path splits = new Path(this.regiondir, SPLITDIR); |
| if(!this.fs.exists(splits)) { |
| this.fs.mkdirs(splits); |
| } |
| // Calculate regionid to use. Can't be less than that of parent else |
| // it'll insert into wrong location over in .META. table: HBASE-710. |
| long rid = System.currentTimeMillis(); |
| if (rid < this.regionInfo.getRegionId()) { |
| LOG.warn("Clock skew; parent regions id is " + |
| this.regionInfo.getRegionId() + " but current time here is " + rid); |
| rid = this.regionInfo.getRegionId() + 1; |
| } |
| HRegionInfo regionAInfo = new HRegionInfo(this.regionInfo.getTableDesc(), |
| startKey, midKey, false, rid); |
| Path dirA = |
| new Path(splits, Integer.toString(regionAInfo.getEncodedName())); |
| if(fs.exists(dirA)) { |
| throw new IOException("Cannot split; target file collision at " + dirA); |
| } |
| HRegionInfo regionBInfo = new HRegionInfo(this.regionInfo.getTableDesc(), |
| midKey, endKey, false, rid); |
| Path dirB = |
| new Path(splits, Integer.toString(regionBInfo.getEncodedName())); |
| if(this.fs.exists(dirB)) { |
| throw new IOException("Cannot split; target file collision at " + dirB); |
| } |
| |
| // Now close the HRegion. Close returns all store files or null if not |
| // supposed to close (? What to do in this case? Implement abort of close?) |
| // Close also does wait on outstanding rows and calls a flush just-in-case. |
| List<HStoreFile> hstoreFilesToSplit = close(false); |
| if (hstoreFilesToSplit == null) { |
| LOG.warn("Close came back null (Implement abort of close?)"); |
| throw new RuntimeException("close returned empty vector of HStoreFiles"); |
| } |
| |
| // Split each store file. |
| for(HStoreFile h: hstoreFilesToSplit) { |
| // A reference to the bottom half of the hsf store file. |
| Reference aReference = new Reference( |
| this.regionInfo.getEncodedName(), h.getFileId(), |
| new HStoreKey(midKey, this.regionInfo), Reference.Range.bottom); |
| HStoreFile a = new HStoreFile(this.conf, fs, splits, |
| regionAInfo, h.getColFamily(), -1, aReference); |
| // Reference to top half of the hsf store file. |
| Reference bReference = new Reference( |
| this.regionInfo.getEncodedName(), h.getFileId(), |
| new HStoreKey(midKey, this.regionInfo), Reference.Range.top); |
| HStoreFile b = new HStoreFile(this.conf, fs, splits, |
| regionBInfo, h.getColFamily(), -1, bReference); |
| h.splitStoreFile(a, b, this.fs); |
| } |
| |
| // Done! |
| // Opening the region copies the splits files from the splits directory |
| // under each region. |
| HRegion regionA = |
| new HRegion(basedir, log, fs, conf, regionAInfo, null); |
| regionA.initialize(dirA, null); |
| regionA.close(); |
| HRegion regionB = |
| new HRegion(basedir, log, fs, conf, regionBInfo, null); |
| regionB.initialize(dirB, null); |
| regionB.close(); |
| |
| // Cleanup |
| boolean deleted = fs.delete(splits, true); // Get rid of splits directory |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Cleaned up " + FSUtils.getPath(splits) + " " + deleted); |
| } |
| HRegion regions[] = new HRegion [] {regionA, regionB}; |
| |
| this.historian.addRegionSplit(this.regionInfo, |
| regionA.getRegionInfo(), regionB.getRegionInfo()); |
| |
| return regions; |
| } |
| } |
| |
| protected void prepareToSplit() { |
| // nothing |
| } |
| |
| /* |
| * @param dir |
| * @return compaction directory for the passed in <code>dir</code> |
| */ |
| static Path getCompactionDir(final Path dir) { |
| return new Path(dir, HREGION_COMPACTIONDIR_NAME); |
| } |
| |
| /* |
| * Do preparation for pending compaction. |
| * Clean out any vestiges of previous failed compactions. |
| * @throws IOException |
| */ |
| private void doRegionCompactionPrep() throws IOException { |
| doRegionCompactionCleanup(); |
| } |
| |
| /* |
| * Removes the compaction directory for this Store. |
| * @throws IOException |
| */ |
| private void doRegionCompactionCleanup() throws IOException { |
| if (this.fs.exists(this.regionCompactionDir)) { |
| this.fs.delete(this.regionCompactionDir, true); |
| } |
| } |
| |
| void setForceMajorCompaction(final boolean b) { |
| this.forceMajorCompaction = b; |
| } |
| |
| boolean getForceMajorCompaction() { |
| return this.forceMajorCompaction; |
| } |
| |
| /** |
| * Called by compaction thread and after region is opened to compact the |
| * HStores if necessary. |
| * |
| * <p>This operation could block for a long time, so don't call it from a |
| * time-sensitive thread. |
| * |
| * Note that no locking is necessary at this level because compaction only |
| * conflicts with a region split, and that cannot happen because the region |
| * server does them sequentially and not in parallel. |
| * |
| * @return mid key if split is needed |
| * @throws IOException |
| */ |
| public byte [] compactStores() throws IOException { |
| boolean majorCompaction = this.forceMajorCompaction; |
| this.forceMajorCompaction = false; |
| return compactStores(majorCompaction); |
| } |
| |
| /* |
| * Called by compaction thread and after region is opened to compact the |
| * HStores if necessary. |
| * |
| * <p>This operation could block for a long time, so don't call it from a |
| * time-sensitive thread. |
| * |
| * Note that no locking is necessary at this level because compaction only |
| * conflicts with a region split, and that cannot happen because the region |
| * server does them sequentially and not in parallel. |
| * |
| * @param majorCompaction True to force a major compaction regardless of thresholds |
| * @return mid key if split is needed |
| * @throws IOException |
| */ |
| byte [] compactStores(final boolean majorCompaction) |
| throws IOException { |
| if (this.closing.get() || this.closed.get()) { |
| LOG.debug("Skipping compaction because closing/closed"); |
| return null; |
| } |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| byte [] midKey = null; |
| if (this.closed.get()) { |
| return midKey; |
| } |
| try { |
| synchronized (writestate) { |
| if (!writestate.compacting && writestate.writesEnabled) { |
| writestate.compacting = true; |
| } else { |
| LOG.info("NOT compacting region " + this + |
| ": compacting=" + writestate.compacting + ", writesEnabled=" + |
| writestate.writesEnabled); |
| return midKey; |
| } |
| } |
| LOG.info("starting " + (majorCompaction? "major" : "") + |
| " compaction on region " + this); |
| long startTime = System.currentTimeMillis(); |
| doRegionCompactionPrep(); |
| long maxSize = -1; |
| for (HStore store: stores.values()) { |
| final HStore.StoreSize size = store.compact(majorCompaction); |
| if (size != null && size.getSize() > maxSize) { |
| maxSize = size.getSize(); |
| midKey = size.getKey(); |
| } |
| } |
| doRegionCompactionCleanup(); |
| String timeTaken = StringUtils.formatTimeDiff(System.currentTimeMillis(), |
| startTime); |
| LOG.info("compaction completed on region " + this + " in " + timeTaken); |
| this.historian.addRegionCompaction(regionInfo, timeTaken); |
| } finally { |
| synchronized (writestate) { |
| writestate.compacting = false; |
| writestate.notifyAll(); |
| } |
| } |
| return midKey; |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Flush the cache. |
| * |
| * When this method is called the cache will be flushed unless: |
| * <ol> |
| * <li>the cache is empty</li> |
| * <li>the region is closed.</li> |
| * <li>a flush is already in progress</li> |
| * <li>writes are disabled</li> |
| * </ol> |
| * |
| * <p>This method may block for some time, so it should not be called from a |
| * time-sensitive thread. |
| * |
| * @return true if cache was flushed |
| * |
| * @throws IOException |
| * @throws DroppedSnapshotException Thrown when replay of hlog is required |
| * because a Snapshot was not properly persisted. |
| */ |
| public boolean flushcache() throws IOException { |
| if (this.closed.get()) { |
| return false; |
| } |
| synchronized (writestate) { |
| if (!writestate.flushing && writestate.writesEnabled) { |
| this.writestate.flushing = true; |
| } else { |
| if(LOG.isDebugEnabled()) { |
| LOG.debug("NOT flushing memcache for region " + this + |
| ", flushing=" + |
| writestate.flushing + ", writesEnabled=" + |
| writestate.writesEnabled); |
| } |
| return false; |
| } |
| } |
| try { |
| // Prevent splits and closes |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| return internalFlushcache(); |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } finally { |
| synchronized (writestate) { |
| writestate.flushing = false; |
| this.writestate.flushRequested = false; |
| writestate.notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Flushing the cache is a little tricky. We have a lot of updates in the |
| * HMemcache, all of which have also been written to the log. We need to |
| * write those updates in the HMemcache out to disk, while being able to |
| * process reads/writes as much as possible during the flush operation. Also, |
| * the log has to state clearly the point in time at which the HMemcache was |
| * flushed. (That way, during recovery, we know when we can rely on the |
| * on-disk flushed structures and when we have to recover the HMemcache from |
| * the log.) |
| * |
| * <p>So, we have a three-step process: |
| * |
| * <ul><li>A. Flush the memcache to the on-disk stores, noting the current |
| * sequence ID for the log.<li> |
| * |
| * <li>B. Write a FLUSHCACHE-COMPLETE message to the log, using the sequence |
| * ID that was current at the time of memcache-flush.</li> |
| * |
| * <li>C. Get rid of the memcache structures that are now redundant, as |
| * they've been flushed to the on-disk HStores.</li> |
| * </ul> |
| * <p>This method is protected, but can be accessed via several public |
| * routes. |
| * |
| * <p> This method may block for some time. |
| * |
| * @return true if the region needs compacting |
| * |
| * @throws IOException |
| * @throws DroppedSnapshotException Thrown when replay of hlog is required |
| * because a Snapshot was not properly persisted. |
| */ |
| private boolean internalFlushcache() throws IOException { |
| final long startTime = System.currentTimeMillis(); |
| // Clear flush flag. |
| // Record latest flush time |
| this.lastFlushTime = startTime; |
| // If nothing to flush, return and avoid logging start/stop flush. |
| if (this.memcacheSize.get() <= 0) { |
| return false; |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Started memcache flush for region " + this + |
| ". Current region memcache size " + |
| StringUtils.humanReadableInt(this.memcacheSize.get())); |
| } |
| |
| // Stop updates while we snapshot the memcache of all stores. We only have |
| // to do this for a moment. Its quick. The subsequent sequence id that |
| // goes into the HLog after we've flushed all these snapshots also goes |
| // into the info file that sits beside the flushed files. |
| // We also set the memcache size to zero here before we allow updates |
| // again so its value will represent the size of the updates received |
| // during the flush |
| long sequenceId = -1L; |
| long completeSequenceId = -1L; |
| this.updatesLock.writeLock().lock(); |
| // Get current size of memcaches. |
| final long currentMemcacheSize = this.memcacheSize.get(); |
| try { |
| for (HStore s: stores.values()) { |
| s.snapshot(); |
| } |
| sequenceId = log.startCacheFlush(); |
| completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); |
| } finally { |
| this.updatesLock.writeLock().unlock(); |
| } |
| |
| // Any failure from here on out will be catastrophic requiring server |
| // restart so hlog content can be replayed and put back into the memcache. |
| // Otherwise, the snapshot content while backed up in the hlog, it will not |
| // be part of the current running servers state. |
| boolean compactionRequested = false; |
| try { |
| // A. Flush memcache to all the HStores. |
| // Keep running vector of all store files that includes both old and the |
| // just-made new flush store file. |
| for (HStore hstore: stores.values()) { |
| boolean needsCompaction = hstore.flushCache(completeSequenceId); |
| if (needsCompaction) { |
| compactionRequested = true; |
| } |
| } |
| // Set down the memcache size by amount of flush. |
| this.memcacheSize.addAndGet(-currentMemcacheSize); |
| } catch (Throwable t) { |
| // An exception here means that the snapshot was not persisted. |
| // The hlog needs to be replayed so its content is restored to memcache. |
| // Currently, only a server restart will do this. |
| // We used to only catch IOEs but its possible that we'd get other |
| // exceptions -- e.g. HBASE-659 was about an NPE -- so now we catch |
| // all and sundry. |
| this.log.abortCacheFlush(); |
| DroppedSnapshotException dse = new DroppedSnapshotException("region: " + |
| Bytes.toString(getRegionName())); |
| dse.initCause(t); |
| throw dse; |
| } |
| |
| // If we get to here, the HStores have been written. If we get an |
| // error in completeCacheFlush it will release the lock it is holding |
| |
| // B. Write a FLUSHCACHE-COMPLETE message to the log. |
| // This tells future readers that the HStores were emitted correctly, |
| // and that all updates to the log for this regionName that have lower |
| // log-sequence-ids can be safely ignored. |
| this.log.completeCacheFlush(getRegionName(), |
| regionInfo.getTableDesc().getName(), completeSequenceId); |
| |
| // C. Finally notify anyone waiting on memcache to clear: |
| // e.g. checkResources(). |
| synchronized (this) { |
| notifyAll(); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| long now = System.currentTimeMillis(); |
| String timeTaken = StringUtils.formatTimeDiff(now, startTime); |
| LOG.debug("Finished memcache flush of ~" + |
| StringUtils.humanReadableInt(currentMemcacheSize) + " for region " + |
| this + " in " + (now - startTime) + "ms, sequence id=" + sequenceId + |
| ", compaction requested=" + compactionRequested); |
| if (!regionInfo.isMetaRegion()) { |
| this.historian.addRegionFlush(regionInfo, timeTaken); |
| } |
| } |
| return compactionRequested; |
| } |
| |
| /** |
| * Get the sequence number to be associated with this cache flush. Used by |
| * TransactionalRegion to not complete pending transactions. |
| * |
| * |
| * @param currentSequenceId |
| * @return sequence id to complete the cache flush with |
| */ |
| protected long getCompleteCacheFlushSequenceId(long currentSequenceId) { |
| return currentSequenceId; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // get() methods for client use. |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Fetch multiple versions of a single data item, with timestamp. |
| * |
| * @param row |
| * @param column |
| * @param ts |
| * @param nv |
| * @return array of values one element per version that matches the timestamp, |
| * or null if there are no matches. |
| * @throws IOException |
| */ |
| public Cell[] get(final byte[] row, final byte[] column, final long ts, |
| final int nv) |
| throws IOException { |
| long timestamp = ts == -1 ? HConstants.LATEST_TIMESTAMP : ts; |
| int numVersions = nv == -1 ? 1 : nv; |
| |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| if (this.closed.get()) { |
| throw new IOException("Region " + this + " closed"); |
| } |
| // Make sure this is a valid row and valid column |
| checkRow(row); |
| checkColumn(column); |
| // Don't need a row lock for a simple get |
| HStoreKey key = new HStoreKey(row, column, timestamp, this.regionInfo); |
| Cell[] result = getStore(column).get(key, numVersions); |
| // Guarantee that we return null instead of a zero-length array, |
| // if there are no results to return. |
| return (result == null || result.length == 0)? null : result; |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Fetch all the columns for the indicated row at a specified timestamp. |
| * Returns a HbaseMapWritable that maps column names to values. |
| * |
| * We should eventually use Bloom filters here, to reduce running time. If |
| * the database has many column families and is very sparse, then we could be |
| * checking many files needlessly. A small Bloom for each row would help us |
| * determine which column groups are useful for that row. That would let us |
| * avoid a bunch of disk activity. |
| * |
| * @param row |
| * @param columns Array of columns you'd like to retrieve. When null, get all. |
| * @param ts |
| * @param numVersions number of versions to retrieve |
| * @param lockid |
| * @return HbaseMapWritable<columnName, Cell> values |
| * @throws IOException |
| */ |
| public HbaseMapWritable<byte [], Cell> getFull(final byte [] row, |
| final Set<byte []> columns, final long ts, |
| final int numVersions, final Integer lockid) |
| throws IOException { |
| // Check columns passed |
| if (columns != null) { |
| for (byte [] column: columns) { |
| checkColumn(column); |
| } |
| } |
| HStoreKey key = new HStoreKey(row, ts, this.regionInfo); |
| Integer lid = getLock(lockid,row); |
| HashSet<HStore> storeSet = new HashSet<HStore>(); |
| try { |
| HbaseMapWritable<byte [], Cell> result = |
| new HbaseMapWritable<byte [], Cell>(); |
| // Get the concerned columns or all of them |
| if (columns != null) { |
| for (byte[] bs : columns) { |
| HStore store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs))); |
| if (store != null) { |
| storeSet.add(store); |
| } |
| } |
| } else { |
| storeSet.addAll(stores.values()); |
| } |
| // For each column name that is just a column family, open the store |
| // related to it and fetch everything for that row. HBASE-631 |
| // Also remove each store from storeSet so that these stores |
| // won't be opened for no reason. HBASE-783 |
| if (columns != null) { |
| for (byte[] bs : columns) { |
| if (HStoreKey.getFamilyDelimiterIndex(bs) == (bs.length - 1)) { |
| HStore store = stores.get(Bytes.mapKey(HStoreKey.getFamily(bs))); |
| store.getFull(key, null, numVersions, result); |
| storeSet.remove(store); |
| } |
| } |
| } |
| |
| for (HStore targetStore: storeSet) { |
| targetStore.getFull(key, columns, numVersions, result); |
| } |
| |
| return result; |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Return all the data for the row that matches <i>row</i> exactly, |
| * or the one that immediately preceeds it, at or immediately before |
| * <i>ts</i>. |
| * |
| * @param row row key |
| * @return map of values |
| * @throws IOException |
| */ |
| RowResult getClosestRowBefore(final byte [] row) |
| throws IOException{ |
| return getClosestRowBefore(row, HConstants.COLUMN_FAMILY); |
| } |
| |
| /** |
| * Return all the data for the row that matches <i>row</i> exactly, |
| * or the one that immediately preceeds it, at or immediately before |
| * <i>ts</i>. |
| * |
| * @param row row key |
| * @param columnFamily Must include the column family delimiter character. |
| * @return map of values |
| * @throws IOException |
| */ |
| public RowResult getClosestRowBefore(final byte [] row, |
| final byte [] columnFamily) |
| throws IOException{ |
| // look across all the HStores for this region and determine what the |
| // closest key is across all column families, since the data may be sparse |
| HStoreKey key = null; |
| checkRow(row); |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| HStore store = getStore(columnFamily); |
| // get the closest key. (HStore.getRowKeyAtOrBefore can return null) |
| byte [] closestKey = store.getRowKeyAtOrBefore(row); |
| // If it happens to be an exact match, we can stop. |
| // Otherwise, we need to check if it's the max and move to the next |
| if (closestKey != null) { |
| if (HStoreKey.equalsTwoRowKeys(regionInfo, row, closestKey)) { |
| key = new HStoreKey(closestKey, this.regionInfo); |
| } |
| if (key == null) { |
| key = new HStoreKey(closestKey, this.regionInfo); |
| } |
| } |
| if (key == null) { |
| return null; |
| } |
| |
| // Now that we've found our key, get the values |
| HbaseMapWritable<byte [], Cell> cells = |
| new HbaseMapWritable<byte [], Cell>(); |
| // This will get all results for this store. |
| store.getFull(key, null, 1, cells); |
| return new RowResult(key.getRow(), cells); |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } |
| |
| /* |
| * Get <code>versions</code> keys matching the origin key's |
| * row/column/timestamp and those of an older vintage. |
| * Public so available when debugging. |
| * @param origin Where to start searching. |
| * @param versions How many versions to return. Pass HConstants.ALL_VERSIONS |
| * to retrieve all. |
| * @return Ordered list of <code>versions</code> keys going from newest back. |
| * @throws IOException |
| */ |
| private Set<HStoreKey> getKeys(final HStoreKey origin, final int versions) |
| throws IOException { |
| Set<HStoreKey> keys = new TreeSet<HStoreKey>(); |
| Collection<HStore> storesToCheck = null; |
| if (origin.getColumn() == null || origin.getColumn().length == 0) { |
| // All families |
| storesToCheck = this.stores.values(); |
| } else { |
| storesToCheck = new ArrayList<HStore>(1); |
| storesToCheck.add(getStore(origin.getColumn())); |
| } |
| long now = System.currentTimeMillis(); |
| for (HStore targetStore: storesToCheck) { |
| if (targetStore != null) { |
| // Pass versions without modification since in the store getKeys, it |
| // includes the size of the passed <code>keys</code> array when counting. |
| List<HStoreKey> r = targetStore.getKeys(origin, versions, now, null); |
| if (r != null) { |
| keys.addAll(r); |
| } |
| } |
| } |
| return keys; |
| } |
| |
| /** |
| * Return an iterator that scans over the HRegion, returning the indicated |
| * columns for only the rows that match the data filter. This Iterator must |
| * be closed by the caller. |
| * |
| * @param cols columns to scan. If column name is a column family, all |
| * columns of the specified column family are returned. Its also possible |
| * to pass a regex in the column qualifier. A column qualifier is judged to |
| * be a regex if it contains at least one of the following characters: |
| * <code>\+|^&*$[]]}{)(</code>. |
| * @param firstRow row which is the starting point of the scan |
| * @param timestamp only return rows whose timestamp is <= this value |
| * @param filter row filter |
| * @return InternalScanner |
| * @throws IOException |
| */ |
| public InternalScanner getScanner(byte[][] cols, byte [] firstRow, |
| long timestamp, RowFilterInterface filter) |
| throws IOException { |
| newScannerLock.readLock().lock(); |
| try { |
| if (this.closed.get()) { |
| throw new IOException("Region " + this + " closed"); |
| } |
| HashSet<HStore> storeSet = new HashSet<HStore>(); |
| for (int i = 0; i < cols.length; i++) { |
| HStore s = stores.get(Bytes.mapKey(HStoreKey.getFamily(cols[i]))); |
| if (s != null) { |
| storeSet.add(s); |
| } |
| } |
| return new HScanner(cols, firstRow, timestamp, |
| storeSet.toArray(new HStore [storeSet.size()]), filter); |
| } finally { |
| newScannerLock.readLock().unlock(); |
| } |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // set() methods for client use. |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * @param b |
| * @throws IOException |
| */ |
| public void batchUpdate(BatchUpdate b) throws IOException { |
| this.batchUpdate(b, null, true); |
| } |
| |
| /** |
| * @param b |
| * @param writeToWAL |
| * @throws IOException |
| */ |
| public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException { |
| this.batchUpdate(b, null, writeToWAL); |
| } |
| |
| |
| /** |
| * @param b |
| * @param lockid |
| * @throws IOException |
| */ |
| public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException { |
| this.batchUpdate(b, lockid, true); |
| } |
| |
| /** |
| * @param b |
| * @param lockid |
| * @param writeToWAL if true, then we write this update to the log |
| * @throws IOException |
| */ |
| public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL) |
| throws IOException { |
| checkReadOnly(); |
| |
| // Do a rough check that we have resources to accept a write. The check is |
| // 'rough' in that between the resource check and the call to obtain a |
| // read lock, resources may run out. For now, the thought is that this |
| // will be extremely rare; we'll deal with it when it happens. |
| checkResources(); |
| |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| // We obtain a per-row lock, so other clients will block while one client |
| // performs an update. The read lock is released by the client calling |
| // #commit or #abort or if the HRegionServer lease on the lock expires. |
| // See HRegionServer#RegionListener for how the expire on HRegionServer |
| // invokes a HRegion#abort. |
| byte [] row = b.getRow(); |
| // If we did not pass an existing row lock, obtain a new one |
| Integer lid = getLock(lockid,row); |
| long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP) ? |
| System.currentTimeMillis() : b.getTimestamp(); |
| try { |
| List<byte []> deletes = null; |
| for (BatchOperation op: b) { |
| HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime, |
| this.regionInfo); |
| byte[] val = null; |
| if (op.isPut()) { |
| val = op.getValue(); |
| if (HLogEdit.isDeleted(val)) { |
| throw new IOException("Cannot insert value: " + val); |
| } |
| } else { |
| if (b.getTimestamp() == LATEST_TIMESTAMP) { |
| // Save off these deletes |
| if (deletes == null) { |
| deletes = new ArrayList<byte []>(); |
| } |
| deletes.add(op.getColumn()); |
| } else { |
| val = HLogEdit.deleteBytes.get(); |
| } |
| } |
| if (val != null) { |
| localput(lid, key, val); |
| } |
| } |
| TreeMap<HStoreKey, byte[]> edits = |
| this.targetColumns.remove(lid); |
| |
| if (edits != null && edits.size() > 0) { |
| update(edits, writeToWAL); |
| } |
| |
| if (deletes != null && deletes.size() > 0) { |
| // We have some LATEST_TIMESTAMP deletes to run. |
| for (byte [] column: deletes) { |
| deleteMultiple(row, column, LATEST_TIMESTAMP, 1); |
| } |
| } |
| } catch (IOException e) { |
| this.targetColumns.remove(Long.valueOf(lid)); |
| throw e; |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } |
| |
| |
| /** |
| * Performs an atomic check and save operation. Checks if |
| * the specified expected values have changed, and if not |
| * applies the update. |
| * |
| * @param b the update to apply |
| * @param expectedValues the expected values to check |
| * @param lockid |
| * @param writeToWAL whether or not to write to the write ahead log |
| * @return true if update was applied |
| * @throws IOException |
| */ |
| public boolean checkAndSave(BatchUpdate b, |
| HbaseMapWritable<byte[], byte[]> expectedValues, Integer lockid, |
| boolean writeToWAL) |
| throws IOException { |
| // This is basically a copy of batchUpdate with the atomic check and save |
| // added in. So you should read this method with batchUpdate. I will |
| // comment the areas that I have changed where I have not changed, you |
| // should read the comments from the batchUpdate method |
| boolean success = true; |
| checkReadOnly(); |
| checkResources(); |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| byte[] row = b.getRow(); |
| Integer lid = getLock(lockid,row); |
| try { |
| Set<byte[]> keySet = expectedValues.keySet(); |
| Map<byte[],Cell> actualValues = this.getFull(row,keySet, |
| HConstants.LATEST_TIMESTAMP, 1,lid); |
| for (byte[] key : keySet) { |
| // If test fails exit |
| Cell cell = actualValues.get(key); |
| byte[] actualValue = new byte[] {}; |
| if (cell != null) |
| actualValue = cell.getValue(); |
| if(!Bytes.equals(actualValue, |
| expectedValues.get(key))) { |
| success = false; |
| break; |
| } |
| } |
| if (success) { |
| long commitTime = (b.getTimestamp() == LATEST_TIMESTAMP)? |
| System.currentTimeMillis(): b.getTimestamp(); |
| List<byte []> deletes = null; |
| for (BatchOperation op: b) { |
| HStoreKey key = new HStoreKey(row, op.getColumn(), commitTime, |
| this.regionInfo); |
| byte[] val = null; |
| if (op.isPut()) { |
| val = op.getValue(); |
| if (HLogEdit.isDeleted(val)) { |
| throw new IOException("Cannot insert value: " + val); |
| } |
| } else { |
| if (b.getTimestamp() == LATEST_TIMESTAMP) { |
| // Save off these deletes |
| if (deletes == null) { |
| deletes = new ArrayList<byte []>(); |
| } |
| deletes.add(op.getColumn()); |
| } else { |
| val = HLogEdit.deleteBytes.get(); |
| } |
| } |
| if (val != null) { |
| localput(lid, key, val); |
| } |
| } |
| TreeMap<HStoreKey, byte[]> edits = |
| this.targetColumns.remove(lid); |
| if (edits != null && edits.size() > 0) { |
| update(edits, writeToWAL); |
| } |
| if (deletes != null && deletes.size() > 0) { |
| // We have some LATEST_TIMESTAMP deletes to run. |
| for (byte [] column: deletes) { |
| deleteMultiple(row, column, LATEST_TIMESTAMP, 1); |
| } |
| } |
| } |
| } catch (IOException e) { |
| this.targetColumns.remove(Long.valueOf(lid)); |
| throw e; |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| return success; |
| } |
| |
| /* |
| * Check if resources to support an update. |
| * |
| * Here we synchronize on HRegion, a broad scoped lock. Its appropriate |
| * given we're figuring in here whether this region is able to take on |
| * writes. This is only method with a synchronize (at time of writing), |
| * this and the synchronize on 'this' inside in internalFlushCache to send |
| * the notify. |
| */ |
| private void checkResources() { |
| boolean blocked = false; |
| while (this.memcacheSize.get() > this.blockingMemcacheSize) { |
| requestFlush(); |
| if (!blocked) { |
| LOG.info("Blocking updates for '" + Thread.currentThread().getName() + |
| "' on region " + Bytes.toString(getRegionName()) + |
| ": Memcache size " + |
| StringUtils.humanReadableInt(this.memcacheSize.get()) + |
| " is >= than blocking " + |
| StringUtils.humanReadableInt(this.blockingMemcacheSize) + " size"); |
| } |
| blocked = true; |
| synchronized(this) { |
| try { |
| wait(threadWakeFrequency); |
| } catch (InterruptedException e) { |
| // continue; |
| } |
| } |
| } |
| if (blocked) { |
| LOG.info("Unblocking updates for region " + this + " '" |
| + Thread.currentThread().getName() + "'"); |
| } |
| } |
| |
| /** |
| * Delete all cells of the same age as the passed timestamp or older. |
| * @param row |
| * @param column |
| * @param ts Delete all entries that have this timestamp or older |
| * @param lockid Row lock |
| * @throws IOException |
| */ |
| public void deleteAll(final byte [] row, final byte [] column, final long ts, |
| final Integer lockid) |
| throws IOException { |
| checkColumn(column); |
| checkReadOnly(); |
| Integer lid = getLock(lockid,row); |
| try { |
| // Delete ALL versions rather than MAX_VERSIONS. If we just did |
| // MAX_VERSIONS, then if 2* MAX_VERSION cells, subsequent gets would |
| // get old stuff. |
| deleteMultiple(row, column, ts, ALL_VERSIONS); |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Delete all cells of the same age as the passed timestamp or older. |
| * @param row |
| * @param ts Delete all entries that have this timestamp or older |
| * @param lockid Row lock |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| public void deleteAll(final byte [] row, final long ts, final Integer lockid) |
| throws IOException { |
| checkReadOnly(); |
| Integer lid = getLock(lockid, row); |
| long now = System.currentTimeMillis(); |
| try { |
| for (HStore store : stores.values()) { |
| List<HStoreKey> keys = |
| store.getKeys(new HStoreKey(row, ts, this.regionInfo), |
| ALL_VERSIONS, now, null); |
| TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| for (HStoreKey key: keys) { |
| edits.put(key, HLogEdit.deleteBytes.get()); |
| } |
| update(edits); |
| } |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Delete all cells for a row with matching columns with timestamps |
| * less than or equal to <i>timestamp</i>. |
| * |
| * @param row The row to operate on |
| * @param columnRegex The column regex |
| * @param timestamp Timestamp to match |
| * @param lockid Row lock |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| public void deleteAllByRegex(final byte [] row, final String columnRegex, |
| final long timestamp, final Integer lockid) throws IOException { |
| checkReadOnly(); |
| Pattern columnPattern = Pattern.compile(columnRegex); |
| Integer lid = getLock(lockid, row); |
| long now = System.currentTimeMillis(); |
| try { |
| for (HStore store : stores.values()) { |
| List<HStoreKey> keys = |
| store.getKeys(new HStoreKey(row, timestamp, this.regionInfo), |
| ALL_VERSIONS, now, columnPattern); |
| TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| for (HStoreKey key: keys) { |
| edits.put(key, HLogEdit.deleteBytes.get()); |
| } |
| update(edits); |
| } |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Delete all cells for a row with matching column family with timestamps |
| * less than or equal to <i>timestamp</i>. |
| * |
| * @param row The row to operate on |
| * @param family The column family to match |
| * @param timestamp Timestamp to match |
| * @param lockid Row lock |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| public void deleteFamily(byte [] row, byte [] family, long timestamp, |
| final Integer lockid) |
| throws IOException{ |
| checkReadOnly(); |
| Integer lid = getLock(lockid, row); |
| long now = System.currentTimeMillis(); |
| try { |
| // find the HStore for the column family |
| HStore store = getStore(family); |
| // find all the keys that match our criteria |
| List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp, |
| this.regionInfo), ALL_VERSIONS, now, null); |
| // delete all the cells |
| TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| for (HStoreKey key: keys) { |
| edits.put(key, HLogEdit.deleteBytes.get()); |
| } |
| update(edits); |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Delete all cells for a row with all the matching column families by |
| * familyRegex with timestamps less than or equal to <i>timestamp</i>. |
| * |
| * @param row The row to operate on |
| * @param familyRegex The column family regex for matching. This regex |
| * expression just match the family name, it didn't include <code>:<code> |
| * @param timestamp Timestamp to match |
| * @param lockid Row lock |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| public void deleteFamilyByRegex(byte [] row, String familyRegex, long timestamp, |
| final Integer lockid) throws IOException { |
| checkReadOnly(); |
| // construct the family regex pattern |
| Pattern familyPattern = Pattern.compile(familyRegex); |
| Integer lid = getLock(lockid, row); |
| long now = System.currentTimeMillis(); |
| try { |
| for(HStore store : stores.values()) { |
| String familyName = Bytes.toString(store.getFamily().getName()); |
| // check the family name match the family pattern. |
| if(!(familyPattern.matcher(familyName).matches())) |
| continue; |
| |
| List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp, |
| this.regionInfo), ALL_VERSIONS, now, null); |
| TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| for (HStoreKey key: keys) { |
| edits.put(key, HLogEdit.deleteBytes.get()); |
| } |
| update(edits); |
| } |
| } finally { |
| if(lockid == null) releaseRowLock(lid); |
| } |
| } |
| |
| /* |
| * Delete one or many cells. |
| * Used to support {@link #deleteAll(byte [], byte [], long)} and deletion of |
| * latest cell. |
| * @param row |
| * @param column |
| * @param ts Timestamp to start search on. |
| * @param versions How many versions to delete. Pass |
| * {@link HConstants#ALL_VERSIONS} to delete all. |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| private void deleteMultiple(final byte [] row, final byte [] column, |
| final long ts, final int versions) |
| throws IOException { |
| checkReadOnly(); |
| HStoreKey origin = new HStoreKey(row, column, ts, this.regionInfo); |
| Set<HStoreKey> keys = getKeys(origin, versions); |
| if (keys.size() > 0) { |
| TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| for (HStoreKey key: keys) { |
| edits.put(key, HLogEdit.deleteBytes.get()); |
| } |
| update(edits); |
| } |
| } |
| |
| /** |
| * Tests for the existence of any cells for a given coordinate. |
| * |
| * @param row the row |
| * @param column the column, or null |
| * @param timestamp the timestamp, or HConstants.LATEST_VERSION for any |
| * @param lockid the existing lock, or null |
| * @return true if cells exist for the row, false otherwise |
| * @throws IOException |
| */ |
| public boolean exists(final byte[] row, final byte[] column, |
| final long timestamp, final Integer lockid) |
| throws IOException { |
| checkRow(row); |
| Integer lid = getLock(lockid, row); |
| try { |
| HStoreKey origin; |
| if (column != null) { |
| origin = new HStoreKey(row, column, timestamp); |
| } else { |
| origin = new HStoreKey(row, timestamp); |
| } |
| return !getKeys(origin, 1).isEmpty(); |
| } finally { |
| if (lockid == null) |
| releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * @throws IOException Throws exception if region is in read-only mode. |
| */ |
| protected void checkReadOnly() throws IOException { |
| if (this.writestate.isReadOnly()) { |
| throw new IOException("region is read only"); |
| } |
| } |
| |
| /** |
| * Private implementation. |
| * |
| * localput() is used for both puts and deletes. We just place the values |
| * into a per-row pending area, until a commit() or abort() call is received. |
| * (Or until the user's write-lock expires.) |
| * |
| * @param lockid |
| * @param key |
| * @param val Value to enter into cell |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| private void localput(final Integer lockid, final HStoreKey key, |
| final byte [] val) |
| throws IOException { |
| checkColumn(key.getColumn()); |
| checkReadOnly(); |
| TreeMap<HStoreKey, byte []> targets = this.targetColumns.get(lockid); |
| if (targets == null) { |
| targets = new TreeMap<HStoreKey, byte []>( |
| new HStoreKey.HStoreKeyWritableComparator(regionInfo)); |
| this.targetColumns.put(lockid, targets); |
| } |
| targets.put(key, val); |
| } |
| |
| /** |
| * Add updates first to the hlog and then add values to memcache. |
| * Warning: Assumption is caller has lock on passed in row. |
| * @param updatesByColumn Cell updates by column |
| * @throws IOException |
| */ |
| private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException { |
| this.update(updatesByColumn, true); |
| } |
| |
| /** |
| * Add updates first to the hlog (if writeToWal) and then add values to memcache. |
| * Warning: Assumption is caller has lock on passed in row. |
| * @param writeToWAL if true, then we should write to the log |
| * @param updatesByColumn Cell updates by column |
| * @throws IOException |
| */ |
| private void update(final TreeMap<HStoreKey, byte []> updatesByColumn, |
| boolean writeToWAL) |
| throws IOException { |
| if (updatesByColumn == null || updatesByColumn.size() <= 0) { |
| return; |
| } |
| checkReadOnly(); |
| boolean flush = false; |
| this.updatesLock.readLock().lock(); |
| try { |
| if (writeToWAL) { |
| this.log.append(regionInfo.getRegionName(), |
| regionInfo.getTableDesc().getName(), updatesByColumn, |
| (regionInfo.isMetaRegion() || regionInfo.isRootRegion())); |
| } |
| long size = 0; |
| for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) { |
| HStoreKey key = e.getKey(); |
| size = this.memcacheSize.addAndGet( |
| getStore(key.getColumn()).add(key, e.getValue())); |
| } |
| flush = isFlushSize(size); |
| } finally { |
| this.updatesLock.readLock().unlock(); |
| } |
| if (flush) { |
| // Request a cache flush. Do it outside update lock. |
| requestFlush(); |
| } |
| } |
| |
| private void requestFlush() { |
| if (this.flushListener == null) { |
| return; |
| } |
| synchronized (writestate) { |
| if (this.writestate.isFlushRequested()) { |
| return; |
| } |
| writestate.flushRequested = true; |
| } |
| // Make request outside of synchronize block; HBASE-818. |
| this.flushListener.request(this); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Flush requested on " + this); |
| } |
| } |
| |
| /* |
| * @param size |
| * @return True if size is over the flush threshold |
| */ |
| private boolean isFlushSize(final long size) { |
| return size > this.memcacheFlushSize; |
| } |
| |
| // Do any reconstruction needed from the log |
| @SuppressWarnings("unused") |
| protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId, |
| Progressable reporter) |
| throws UnsupportedEncodingException, IOException { |
| // Nothing to do (Replaying is done in HStores) |
| } |
| |
| protected HStore instantiateHStore(Path baseDir, |
| HColumnDescriptor c, Path oldLogFile, Progressable reporter) |
| throws IOException { |
| return new HStore(baseDir, this.regionInfo, c, this.fs, oldLogFile, |
| this.conf, reporter); |
| } |
| |
| /** |
| * Return HStore instance. |
| * Use with caution. Exposed for use of fixup utilities. |
| * @param column Name of column family hosted by this region. |
| * @return Store that goes with the family on passed <code>column</code>. |
| * TODO: Make this lookup faster. |
| */ |
| public HStore getStore(final byte [] column) { |
| return this.stores.get(HStoreKey.getFamilyMapKey(column)); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Support code |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** Make sure this is a valid row for the HRegion */ |
| private void checkRow(final byte [] row) throws IOException { |
| if(!rowIsInRange(regionInfo, row)) { |
| throw new WrongRegionException("Requested row out of range for " + |
| "HRegion " + this + ", startKey='" + |
| Bytes.toString(regionInfo.getStartKey()) + "', getEndKey()='" + |
| Bytes.toString(regionInfo.getEndKey()) + "', row='" + |
| Bytes.toString(row) + "'"); |
| } |
| } |
| |
| /* |
| * Make sure this is a valid column for the current table |
| * @param columnName |
| * @throws NoSuchColumnFamilyException |
| */ |
| private void checkColumn(final byte [] columnName) |
| throws NoSuchColumnFamilyException, ColumnNameParseException { |
| if (columnName == null) { |
| return; |
| } |
| |
| int index = HStoreKey.getFamilyDelimiterIndex(columnName); |
| if (index <= 0) { |
| throw new ColumnNameParseException(Bytes.toString(columnName) + |
| " is missing column family delimiter '" + |
| HStoreKey.COLUMN_FAMILY_DELIMITER + "'"); |
| } |
| if (!regionInfo.getTableDesc().hasFamily(columnName, index)) { |
| throw new NoSuchColumnFamilyException("Column family on " + |
| Bytes.toString(columnName) + " does not exist in region " + this |
| + " in table " + regionInfo.getTableDesc()); |
| } |
| } |
| |
| /** |
| * Obtain a lock on the given row. Blocks until success. |
| * |
| * I know it's strange to have two mappings: |
| * <pre> |
| * ROWS ==> LOCKS |
| * </pre> |
| * as well as |
| * <pre> |
| * LOCKS ==> ROWS |
| * </pre> |
| * |
| * But it acts as a guard on the client; a miswritten client just can't |
| * submit the name of a row and start writing to it; it must know the correct |
| * lockid, which matches the lock list in memory. |
| * |
| * <p>It would be more memory-efficient to assume a correctly-written client, |
| * which maybe we'll do in the future. |
| * |
| * @param row Name of row to lock. |
| * @throws IOException |
| * @return The id of the held lock. |
| */ |
| Integer obtainRowLock(final byte [] row) throws IOException { |
| checkRow(row); |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| if (this.closed.get()) { |
| throw new NotServingRegionException("Region " + this + " closed"); |
| } |
| Integer key = Bytes.mapKey(row); |
| synchronized (locksToRows) { |
| while (locksToRows.containsKey(key)) { |
| try { |
| locksToRows.wait(); |
| } catch (InterruptedException ie) { |
| // Empty |
| } |
| } |
| locksToRows.put(key, row); |
| locksToRows.notifyAll(); |
| return key; |
| } |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Used by unit tests. |
| * @param lockid |
| * @return Row that goes with <code>lockid</code> |
| */ |
| byte [] getRowFromLock(final Integer lockid) { |
| return locksToRows.get(lockid); |
| } |
| |
| /** |
| * Release the row lock! |
| * @param row Name of row whose lock we are to release |
| */ |
| void releaseRowLock(final Integer lockid) { |
| synchronized (locksToRows) { |
| locksToRows.remove(lockid); |
| locksToRows.notifyAll(); |
| } |
| } |
| |
| /** |
| * See if row is currently locked. |
| * @param lockid |
| * @return boolean |
| */ |
| private boolean isRowLocked(final Integer lockid) { |
| synchronized (locksToRows) { |
| if(locksToRows.containsKey(lockid)) { |
| return true; |
| } |
| return false; |
| } |
| } |
| |
| /** |
| * Returns existing row lock if found, otherwise |
| * obtains a new row lock and returns it. |
| * @param lockid |
| * @return lockid |
| */ |
| private Integer getLock(Integer lockid, byte [] row) |
| throws IOException { |
| Integer lid = null; |
| if (lockid == null) { |
| lid = obtainRowLock(row); |
| } else { |
| if(!isRowLocked(lockid)) { |
| throw new IOException("Invalid row lock"); |
| } |
| lid = lockid; |
| } |
| return lid; |
| } |
| |
| private void waitOnRowLocks() { |
| synchronized (locksToRows) { |
| while (this.locksToRows.size() > 0) { |
| LOG.debug("waiting for " + this.locksToRows.size() + " row locks"); |
| try { |
| this.locksToRows.wait(); |
| } catch (InterruptedException e) { |
| // Catch. Let while test determine loop-end. |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| return this.hashCode() == ((HRegion)o).hashCode(); |
| } |
| |
| @Override |
| public int hashCode() { |
| return this.regionInfo.getRegionName().hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return this.regionInfo.getRegionNameAsString(); |
| } |
| |
| /** @return Path of region base directory */ |
| public Path getBaseDir() { |
| return this.basedir; |
| } |
| |
| /** |
| * HScanner is an iterator through a bunch of rows in an HRegion. |
| */ |
| private class HScanner implements InternalScanner { |
| private InternalScanner[] scanners; |
| private TreeMap<byte [], Cell>[] resultSets; |
| private HStoreKey[] keys; |
| private RowFilterInterface filter; |
| |
| /** Create an HScanner with a handle on many HStores. */ |
| @SuppressWarnings("unchecked") |
| HScanner(byte [][] cols, byte [] firstRow, long timestamp, HStore[] stores, |
| RowFilterInterface filter) |
| throws IOException { |
| this.filter = filter; |
| this.scanners = new InternalScanner[stores.length]; |
| try { |
| for (int i = 0; i < stores.length; i++) { |
| |
| // Only pass relevant columns to each store |
| |
| List<byte[]> columns = new ArrayList<byte[]>(); |
| for (int j = 0; j < cols.length; j++) { |
| if (Bytes.equals(HStoreKey.getFamily(cols[j]), |
| stores[i].getFamily().getName())) { |
| columns.add(cols[j]); |
| } |
| } |
| |
| RowFilterInterface f = filter; |
| if (f != null) { |
| // Need to replicate filters. |
| // At least WhileMatchRowFilter will mess up the scan if only |
| // one shared across many rows. See HADOOP-2467. |
| f = WritableUtils.clone(filter, conf); |
| } |
| scanners[i] = stores[i].getScanner(timestamp, |
| columns.toArray(new byte[columns.size()][]), firstRow, f); |
| } |
| } catch (IOException e) { |
| for (int i = 0; i < this.scanners.length; i++) { |
| if(scanners[i] != null) { |
| closeScanner(i); |
| } |
| } |
| throw e; |
| } |
| |
| // Advance to the first key in each store. |
| // All results will match the required column-set and scanTime. |
| this.resultSets = new TreeMap[scanners.length]; |
| this.keys = new HStoreKey[scanners.length]; |
| for (int i = 0; i < scanners.length; i++) { |
| keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY,regionInfo); |
| resultSets[i] = new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR); |
| if(scanners[i] != null && !scanners[i].next(keys[i], resultSets[i])) { |
| closeScanner(i); |
| } |
| } |
| } |
| |
| @SuppressWarnings("null") |
| public boolean next(HStoreKey key, SortedMap<byte [], Cell> results) |
| throws IOException { |
| boolean moreToFollow = false; |
| boolean filtered = false; |
| |
| do { |
| // Find the lowest-possible key. |
| byte [] chosenRow = null; |
| long chosenTimestamp = -1; |
| for (int i = 0; i < this.keys.length; i++) { |
| if (scanners[i] != null && |
| (chosenRow == null || |
| (HStoreKey.compareTwoRowKeys(regionInfo, |
| keys[i].getRow(), chosenRow) < 0) || |
| ((HStoreKey.compareTwoRowKeys(regionInfo, keys[i].getRow(), |
| chosenRow) == 0) && |
| (keys[i].getTimestamp() > chosenTimestamp)))) { |
| chosenRow = keys[i].getRow(); |
| chosenTimestamp = keys[i].getTimestamp(); |
| } |
| } |
| |
| // Store the key and results for each sub-scanner. Merge them as |
| // appropriate. |
| if (chosenTimestamp >= 0) { |
| // Here we are setting the passed in key with current row+timestamp |
| key.setRow(chosenRow); |
| key.setVersion(chosenTimestamp); |
| key.setColumn(HConstants.EMPTY_BYTE_ARRAY); |
| |
| for (int i = 0; i < scanners.length; i++) { |
| if (scanners[i] != null && |
| HStoreKey.compareTwoRowKeys(regionInfo,keys[i].getRow(), chosenRow) == 0) { |
| // NOTE: We used to do results.putAll(resultSets[i]); |
| // but this had the effect of overwriting newer |
| // values with older ones. So now we only insert |
| // a result if the map does not contain the key. |
| for (Map.Entry<byte [], Cell> e : resultSets[i].entrySet()) { |
| if (!results.containsKey(e.getKey())) { |
| results.put(e.getKey(), e.getValue()); |
| } |
| } |
| resultSets[i].clear(); |
| if (!scanners[i].next(keys[i], resultSets[i])) { |
| closeScanner(i); |
| } |
| } |
| } |
| } |
| |
| for (int i = 0; i < scanners.length; i++) { |
| // If the current scanner is non-null AND has a lower-or-equal |
| // row label, then its timestamp is bad. We need to advance it. |
| while ((scanners[i] != null) && |
| (HStoreKey.compareTwoRowKeys(regionInfo,keys[i].getRow(), chosenRow) <= 0)) { |
| resultSets[i].clear(); |
| if (!scanners[i].next(keys[i], resultSets[i])) { |
| closeScanner(i); |
| } |
| } |
| } |
| |
| moreToFollow = chosenTimestamp >= 0; |
| if (results == null || results.size() <= 0) { |
| // If we got no results, then there is no more to follow. |
| moreToFollow = false; |
| } |
| |
| filtered = filter == null ? false : filter.filterRow(results); |
| |
| if (filter != null && filter.filterAllRemaining()) { |
| moreToFollow = false; |
| } |
| |
| if (moreToFollow) { |
| if (filter != null) { |
| filter.rowProcessed(filtered, key.getRow()); |
| } |
| if (filtered) { |
| results.clear(); |
| } |
| } |
| } while(filtered && moreToFollow); |
| |
| // Make sure scanners closed if no more results |
| if (!moreToFollow) { |
| for (int i = 0; i < scanners.length; i++) { |
| if (null != scanners[i]) { |
| closeScanner(i); |
| } |
| } |
| } |
| |
| return moreToFollow; |
| } |
| |
| /** Shut down a single scanner */ |
| void closeScanner(int i) { |
| try { |
| try { |
| scanners[i].close(); |
| } catch (IOException e) { |
| LOG.warn("Failed closing scanner " + i, e); |
| } |
| } finally { |
| scanners[i] = null; |
| // These data members can be null if exception in constructor |
| if (resultSets != null) { |
| resultSets[i] = null; |
| } |
| if (keys != null) { |
| keys[i] = null; |
| } |
| } |
| } |
| |
| public void close() { |
| for(int i = 0; i < scanners.length; i++) { |
| if(scanners[i] != null) { |
| closeScanner(i); |
| } |
| } |
| } |
| |
| public boolean isWildcardScanner() { |
| throw new UnsupportedOperationException("Unimplemented on HScanner"); |
| } |
| |
| public boolean isMultipleMatchScanner() { |
| throw new UnsupportedOperationException("Unimplemented on HScanner"); |
| } |
| } |
| |
| // Utility methods |
| |
| /** |
| * Convenience method creating new HRegions. Used by createTable and by the |
| * bootstrap code in the HMaster constructor. |
| * Note, this method creates an {@link HLog} for the created region. It |
| * needs to be closed explicitly. Use {@link HRegion#getLog()} to get |
| * access. |
| * @param info Info for region to create. |
| * @param rootDir Root directory for HBase instance |
| * @param conf |
| * @return new HRegion |
| * |
| * @throws IOException |
| */ |
| public static HRegion createHRegion(final HRegionInfo info, final Path rootDir, |
| final HBaseConfiguration conf) |
| throws IOException { |
| Path tableDir = |
| HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()); |
| Path regionDir = HRegion.getRegionDir(tableDir, info.getEncodedName()); |
| FileSystem fs = FileSystem.get(conf); |
| fs.mkdirs(regionDir); |
| // Note in historian the creation of new region. |
| if (!info.isMetaRegion()) { |
| RegionHistorian.getInstance().addRegionCreation(info); |
| } |
| HRegion region = new HRegion(tableDir, |
| new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null), |
| fs, conf, info, null); |
| region.initialize(null, null); |
| return region; |
| } |
| |
| /** |
| * Convenience method to open a HRegion outside of an HRegionServer context. |
| * @param info Info for region to be opened. |
| * @param rootDir Root directory for HBase instance |
| * @param log HLog for region to use. This method will call |
| * HLog#setSequenceNumber(long) passing the result of the call to |
| * HRegion#getMinSequenceId() to ensure the log id is properly kept |
| * up. HRegionStore does this every time it opens a new region. |
| * @param conf |
| * @return new HRegion |
| * |
| * @throws IOException |
| */ |
| public static HRegion openHRegion(final HRegionInfo info, final Path rootDir, |
| final HLog log, final HBaseConfiguration conf) |
| throws IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Opening region: " + info); |
| } |
| if (info == null) { |
| throw new NullPointerException("Passed region info is null"); |
| } |
| HRegion r = new HRegion( |
| HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()), |
| log, FileSystem.get(conf), conf, info, null); |
| r.initialize(null, null); |
| if (log != null) { |
| log.setSequenceNumber(r.getMinSequenceId()); |
| } |
| return r; |
| } |
| |
| /** |
| * Inserts a new region's meta information into the passed |
| * <code>meta</code> region. Used by the HMaster bootstrap code adding |
| * new table to ROOT table. |
| * |
| * @param meta META HRegion to be updated |
| * @param r HRegion to add to <code>meta</code> |
| * |
| * @throws IOException |
| */ |
| @SuppressWarnings("unchecked") |
| public static void addRegionToMETA(HRegion meta, HRegion r) |
| throws IOException { |
| meta.checkResources(); |
| // The row key is the region name |
| byte [] row = r.getRegionName(); |
| Integer lid = meta.obtainRowLock(row); |
| try { |
| HStoreKey key = new HStoreKey(row, COL_REGIONINFO, |
| System.currentTimeMillis(), r.getRegionInfo()); |
| TreeMap<HStoreKey, byte[]> edits = new TreeMap<HStoreKey, byte[]>( |
| new HStoreKey.HStoreKeyWritableComparator(meta.getRegionInfo())); |
| edits.put(key, Writables.getBytes(r.getRegionInfo())); |
| meta.update(edits); |
| } finally { |
| meta.releaseRowLock(lid); |
| } |
| } |
| |
| /** |
| * Delete a region's meta information from the passed |
| * <code>meta</code> region. Removes content in the 'info' column family. |
| * Does not remove region historian info. |
| * |
| * @param srvr META server to be updated |
| * @param metaRegionName Meta region name |
| * @param regionName HRegion to remove from <code>meta</code> |
| * |
| * @throws IOException |
| */ |
| public static void removeRegionFromMETA(final HRegionInterface srvr, |
| final byte [] metaRegionName, final byte [] regionName) |
| throws IOException { |
| srvr.deleteFamily(metaRegionName, regionName, HConstants.COLUMN_FAMILY, |
| HConstants.LATEST_TIMESTAMP, -1L); |
| } |
| |
| /** |
| * Utility method used by HMaster marking regions offlined. |
| * @param srvr META server to be updated |
| * @param metaRegionName Meta region name |
| * @param info HRegion to update in <code>meta</code> |
| * |
| * @throws IOException |
| */ |
| public static void offlineRegionInMETA(final HRegionInterface srvr, |
| final byte [] metaRegionName, final HRegionInfo info) |
| throws IOException { |
| BatchUpdate b = new BatchUpdate(info.getRegionName()); |
| info.setOffline(true); |
| b.put(COL_REGIONINFO, Writables.getBytes(info)); |
| b.delete(COL_SERVER); |
| b.delete(COL_STARTCODE); |
| // If carrying splits, they'll be in place when we show up on new |
| // server. |
| srvr.batchUpdate(metaRegionName, b, -1L); |
| } |
| |
| /** |
| * Clean COL_SERVER and COL_STARTCODE for passed <code>info</code> in |
| * <code>.META.</code> |
| * @param srvr |
| * @param metaRegionName |
| * @param info |
| * @throws IOException |
| */ |
| public static void cleanRegionInMETA(final HRegionInterface srvr, |
| final byte [] metaRegionName, final HRegionInfo info) |
| throws IOException { |
| BatchUpdate b = new BatchUpdate(info.getRegionName()); |
| b.delete(COL_SERVER); |
| b.delete(COL_STARTCODE); |
| // If carrying splits, they'll be in place when we show up on new |
| // server. |
| srvr.batchUpdate(metaRegionName, b, LATEST_TIMESTAMP); |
| } |
| |
| /** |
| * Deletes all the files for a HRegion |
| * |
| * @param fs the file system object |
| * @param rootdir qualified path of HBase root directory |
| * @param info HRegionInfo for region to be deleted |
| * @throws IOException |
| */ |
| public static void deleteRegion(FileSystem fs, Path rootdir, HRegionInfo info) |
| throws IOException { |
| deleteRegion(fs, HRegion.getRegionDir(rootdir, info)); |
| } |
| |
| private static void deleteRegion(FileSystem fs, Path regiondir) |
| throws IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("DELETING region " + regiondir.toString()); |
| } |
| fs.delete(regiondir, true); |
| } |
| |
| /** |
| * Computes the Path of the HRegion |
| * |
| * @param tabledir qualified path for table |
| * @param name ENCODED region name |
| * @return Path of HRegion directory |
| */ |
| public static Path getRegionDir(final Path tabledir, final int name) { |
| return new Path(tabledir, Integer.toString(name)); |
| } |
| |
| /** |
| * Computes the Path of the HRegion |
| * |
| * @param rootdir qualified path of HBase root directory |
| * @param info HRegionInfo for the region |
| * @return qualified path of region directory |
| */ |
| public static Path getRegionDir(final Path rootdir, final HRegionInfo info) { |
| return new Path( |
| HTableDescriptor.getTableDir(rootdir, info.getTableDesc().getName()), |
| Integer.toString(info.getEncodedName())); |
| } |
| |
| /** |
| * Determines if the specified row is within the row range specified by the |
| * specified HRegionInfo |
| * |
| * @param info HRegionInfo that specifies the row range |
| * @param row row to be checked |
| * @return true if the row is within the range specified by the HRegionInfo |
| */ |
| public static boolean rowIsInRange(HRegionInfo info, final byte [] row) { |
| return ((info.getStartKey().length == 0) || |
| (HStoreKey.compareTwoRowKeys(info,info.getStartKey(), row) <= 0)) && |
| ((info.getEndKey().length == 0) || |
| (HStoreKey.compareTwoRowKeys(info,info.getEndKey(), row) > 0)); |
| } |
| |
| /** |
| * Make the directories for a specific column family |
| * |
| * @param fs the file system |
| * @param basedir base directory where region will live (usually the table dir) |
| * @param encodedRegionName encoded region name |
| * @param colFamily the column family |
| * @param tabledesc table descriptor of table |
| * @throws IOException |
| */ |
| public static void makeColumnFamilyDirs(FileSystem fs, Path basedir, |
| int encodedRegionName, byte [] colFamily, HTableDescriptor tabledesc) |
| throws IOException { |
| fs.mkdirs(HStoreFile.getMapDir(basedir, encodedRegionName, colFamily)); |
| fs.mkdirs(HStoreFile.getInfoDir(basedir, encodedRegionName, colFamily)); |
| } |
| |
| /** |
| * Merge two HRegions. The regions must be adjacent andmust not overlap. |
| * |
| * @param srcA |
| * @param srcB |
| * @return new merged HRegion |
| * @throws IOException |
| */ |
| public static HRegion mergeAdjacent(final HRegion srcA, final HRegion srcB) |
| throws IOException { |
| |
| HRegion a = srcA; |
| HRegion b = srcB; |
| |
| // Make sure that srcA comes first; important for key-ordering during |
| // write of the merged file. |
| if (srcA.getStartKey() == null) { |
| if (srcB.getStartKey() == null) { |
| throw new IOException("Cannot merge two regions with null start key"); |
| } |
| // A's start key is null but B's isn't. Assume A comes before B |
| } else if ((srcB.getStartKey() == null) // A is not null but B is |
| || (HStoreKey.compareTwoRowKeys(srcA.getRegionInfo(), |
| srcA.getStartKey(), srcB.getStartKey()) > 0)) { // A > B |
| a = srcB; |
| b = srcA; |
| } |
| |
| if (!HStoreKey.equalsTwoRowKeys(srcA.getRegionInfo(), |
| a.getEndKey(), b.getStartKey())) { |
| throw new IOException("Cannot merge non-adjacent regions"); |
| } |
| return merge(a, b); |
| } |
| |
| /** |
| * Merge two regions whether they are adjacent or not. |
| * |
| * @param a region a |
| * @param b region b |
| * @return new merged region |
| * @throws IOException |
| */ |
| public static HRegion merge(HRegion a, HRegion b) throws IOException { |
| if (!a.getRegionInfo().getTableDesc().getNameAsString().equals( |
| b.getRegionInfo().getTableDesc().getNameAsString())) { |
| throw new IOException("Regions do not belong to the same table"); |
| } |
| |
| FileSystem fs = a.getFilesystem(); |
| |
| // Make sure each region's cache is empty |
| |
| a.flushcache(); |
| b.flushcache(); |
| |
| // Compact each region so we only have one store file per family |
| |
| a.compactStores(true); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Files for region: " + a); |
| listPaths(fs, a.getRegionDir()); |
| } |
| b.compactStores(true); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Files for region: " + b); |
| listPaths(fs, b.getRegionDir()); |
| } |
| |
| HBaseConfiguration conf = a.getConf(); |
| HTableDescriptor tabledesc = a.getTableDesc(); |
| HLog log = a.getLog(); |
| Path basedir = a.getBaseDir(); |
| final byte [] startKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(), |
| a.getStartKey(), EMPTY_BYTE_ARRAY) || |
| HStoreKey.equalsTwoRowKeys(a.getRegionInfo(), |
| b.getStartKey(), EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY : |
| HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getStartKey(), |
| b.getStartKey()) <= 0 ? |
| a.getStartKey() : b.getStartKey(); |
| final byte [] endKey = HStoreKey.equalsTwoRowKeys(a.getRegionInfo(), |
| a.getEndKey(), EMPTY_BYTE_ARRAY) || |
| HStoreKey.equalsTwoRowKeys(b.getRegionInfo(), b.getEndKey(), |
| EMPTY_BYTE_ARRAY) ? EMPTY_BYTE_ARRAY : |
| HStoreKey.compareTwoRowKeys(a.getRegionInfo(), a.getEndKey(), |
| b.getEndKey()) <= 0 ? |
| b.getEndKey() : a.getEndKey(); |
| |
| HRegionInfo newRegionInfo = new HRegionInfo(tabledesc, startKey, endKey); |
| LOG.info("Creating new region " + newRegionInfo.toString()); |
| int encodedName = newRegionInfo.getEncodedName(); |
| Path newRegionDir = HRegion.getRegionDir(a.getBaseDir(), encodedName); |
| if(fs.exists(newRegionDir)) { |
| throw new IOException("Cannot merge; target file collision at " + |
| newRegionDir); |
| } |
| fs.mkdirs(newRegionDir); |
| |
| LOG.info("starting merge of regions: " + a + " and " + b + |
| " into new region " + newRegionInfo.toString() + |
| " with start key <" + startKey + "> and end key <" + endKey + ">"); |
| |
| // Move HStoreFiles under new region directory |
| |
| Map<byte [], List<HStoreFile>> byFamily = |
| new TreeMap<byte [], List<HStoreFile>>(Bytes.BYTES_COMPARATOR); |
| byFamily = filesByFamily(byFamily, a.close()); |
| byFamily = filesByFamily(byFamily, b.close()); |
| for (Map.Entry<byte [], List<HStoreFile>> es : byFamily.entrySet()) { |
| byte [] colFamily = es.getKey(); |
| makeColumnFamilyDirs(fs, basedir, encodedName, colFamily, tabledesc); |
| |
| // Because we compacted the source regions we should have no more than two |
| // HStoreFiles per family and there will be no reference store |
| List<HStoreFile> srcFiles = es.getValue(); |
| if (srcFiles.size() == 2) { |
| long seqA = srcFiles.get(0).loadInfo(fs); |
| long seqB = srcFiles.get(1).loadInfo(fs); |
| if (seqA == seqB) { |
| // We can't have duplicate sequence numbers |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Adjusting sequence id of storeFile " + srcFiles.get(1) + |
| " down by one; sequence id A=" + seqA + ", sequence id B=" + |
| seqB); |
| } |
| srcFiles.get(1).writeInfo(fs, seqB - 1); |
| } |
| } |
| for (HStoreFile hsf: srcFiles) { |
| HStoreFile dst = new HStoreFile(conf, fs, basedir, |
| newRegionInfo, colFamily, -1, null); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Renaming " + hsf + " to " + dst); |
| } |
| hsf.rename(fs, dst); |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Files for new region"); |
| listPaths(fs, newRegionDir); |
| } |
| HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null); |
| dstRegion.initialize(null, null); |
| dstRegion.compactStores(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Files for new region"); |
| listPaths(fs, dstRegion.getRegionDir()); |
| } |
| deleteRegion(fs, a.getRegionDir()); |
| deleteRegion(fs, b.getRegionDir()); |
| |
| LOG.info("merge completed. New region is " + dstRegion); |
| |
| return dstRegion; |
| } |
| |
| /* |
| * Fills a map with a vector of store files keyed by column family. |
| * @param byFamily Map to fill. |
| * @param storeFiles Store files to process. |
| * @return Returns <code>byFamily</code> |
| */ |
| private static Map<byte [], List<HStoreFile>> filesByFamily( |
| Map<byte [], List<HStoreFile>> byFamily, List<HStoreFile> storeFiles) { |
| for (HStoreFile src: storeFiles) { |
| List<HStoreFile> v = byFamily.get(src.getColFamily()); |
| if (v == null) { |
| v = new ArrayList<HStoreFile>(); |
| byFamily.put(src.getColFamily(), v); |
| } |
| v.add(src); |
| } |
| return byFamily; |
| } |
| |
| /* |
| * Method to list files in use by region |
| */ |
| static void listFiles(FileSystem fs, HRegion r) throws IOException { |
| listPaths(fs, r.getRegionDir()); |
| } |
| |
| /** |
| * @return True if needs a mojor compaction. |
| * @throws IOException |
| */ |
| boolean isMajorCompaction() throws IOException { |
| for (HStore store: this.stores.values()) { |
| if (store.isMajorCompaction()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /* |
| * List the files under the specified directory |
| * |
| * @param fs |
| * @param dir |
| * @throws IOException |
| */ |
| private static void listPaths(FileSystem fs, Path dir) throws IOException { |
| if (LOG.isDebugEnabled()) { |
| FileStatus[] stats = fs.listStatus(dir); |
| if (stats == null || stats.length == 0) { |
| return; |
| } |
| for (int i = 0; i < stats.length; i++) { |
| String path = stats[i].getPath().toString(); |
| if (stats[i].isDir()) { |
| LOG.debug("d " + path); |
| listPaths(fs, stats[i].getPath()); |
| } else { |
| LOG.debug("f " + path + " size=" + stats[i].getLen()); |
| } |
| } |
| } |
| } |
| |
| public long incrementColumnValue(byte[] row, byte[] column, long amount) throws IOException { |
| checkRow(row); |
| checkColumn(column); |
| |
| Integer lid = obtainRowLock(row); |
| splitsAndClosesLock.readLock().lock(); |
| try { |
| HStoreKey hsk = new HStoreKey(row, column); |
| long ts = System.currentTimeMillis(); |
| byte [] value = null; |
| |
| HStore store = getStore(column); |
| |
| List<Cell> c; |
| // Try the memcache first. |
| store.lock.readLock().lock(); |
| try { |
| c = store.memcache.get(hsk, 1); |
| } finally { |
| store.lock.readLock().unlock(); |
| } |
| // Pick the latest value out of List<Cell> c: |
| if (c.size() >= 1) { |
| // Use the memcache timestamp value. |
| LOG.debug("Overwriting the memcache value for " + Bytes.toString(row) + "/" + Bytes.toString(column)); |
| ts = c.get(0).getTimestamp(); |
| value = c.get(0).getValue(); |
| } |
| |
| if (value == null) { |
| // Check the store (including disk) for the previous value. |
| Cell[] cell = store.get(hsk, 1); |
| if (cell != null && cell.length >= 1) { |
| LOG.debug("Using HFile previous value for " + Bytes.toString(row) + "/" + Bytes.toString(column)); |
| value = cell[0].getValue(); |
| } |
| } |
| |
| if (value == null) { |
| // Doesn't exist |
| LOG.debug("Creating new counter value for " + Bytes.toString(row) + "/"+ Bytes.toString(column)); |
| value = Bytes.toBytes(amount); |
| } else { |
| if (amount == 0) return Bytes.toLong(value); |
| value = Bytes.incrementBytes(value, amount); |
| } |
| |
| BatchUpdate b = new BatchUpdate(row, ts); |
| b.put(column, value); |
| batchUpdate(b, lid, true); |
| return Bytes.toLong(value); |
| } finally { |
| splitsAndClosesLock.readLock().unlock(); |
| releaseRowLock(lid); |
| } |
| } |
| |
| /* |
| * This method calls System.exit. |
| * @param message Message to print out. May be null. |
| */ |
| private static void printUsageAndExit(final String message) { |
| if (message != null && message.length() > 0) System.out.println(message); |
| System.out.println("Usage: HRegion CATLALOG_TABLE_DIR [major_compact]"); |
| System.out.println("Options:"); |
| System.out.println(" major_compact Pass this option to major compact " + |
| "passed region."); |
| System.out.println("Default outputs scan of passed region."); |
| System.exit(1); |
| } |
| |
| /* |
| * Process table. |
| * Do major compaction or list content. |
| * @param fs |
| * @param p |
| * @param log |
| * @param c |
| * @param majorCompact |
| * @throws IOException |
| */ |
| private static void processTable(final FileSystem fs, final Path p, |
| final HLog log, final HBaseConfiguration c, |
| final boolean majorCompact) |
| throws IOException { |
| HRegion region = null; |
| String rootStr = Bytes.toString(HConstants.ROOT_TABLE_NAME); |
| String metaStr = Bytes.toString(HConstants.META_TABLE_NAME); |
| // Currently expects tables have one region only. |
| if (p.getName().startsWith(rootStr)) { |
| region = new HRegion(p, log, fs, c, HRegionInfo.ROOT_REGIONINFO, null); |
| } else if (p.getName().startsWith(metaStr)) { |
| region = new HRegion(p, log, fs, c, HRegionInfo.FIRST_META_REGIONINFO, |
| null); |
| } else { |
| throw new IOException("Not a known catalog table: " + p.toString()); |
| } |
| try { |
| region.initialize(null, null); |
| if (majorCompact) { |
| region.compactStores(true); |
| } else { |
| // Default behavior |
| InternalScanner scanner = region.getScanner( |
| new byte [][] {HConstants.COLUMN_FAMILY, HConstants.COLUMN_FAMILY_HISTORIAN}, |
| HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP, null); |
| try { |
| boolean done = false; |
| do { |
| HStoreKey key = new HStoreKey(); |
| SortedMap<byte [], Cell> values = |
| new TreeMap<byte [], Cell>(Bytes.BYTES_COMPARATOR); |
| done = scanner.next(key, values); |
| if (values.size() > 0) LOG.info(key + " " + values); |
| } while (done); |
| } finally { |
| scanner.close(); |
| } |
| } |
| } finally { |
| region.close(); |
| } |
| } |
| |
| /** |
| * Facility for dumping and compacting catalog tables. |
| * Only does catalog tables since these are only tables we for sure know |
| * schema on. For usage run: |
| * <pre> |
| * ./bin/hbase org.apache.hadoop.hbase.regionserver.HRegion |
| * </pre> |
| * @param args |
| * @throws IOException |
| */ |
| public static void main(String[] args) throws IOException { |
| if (args.length < 1) { |
| printUsageAndExit(null); |
| } |
| boolean majorCompact = false; |
| if (args.length > 1) { |
| if (!args[1].toLowerCase().startsWith("major")) { |
| printUsageAndExit("ERROR: Unrecognized option <" + args[1] + ">"); |
| } |
| majorCompact = true; |
| |
| } |
| Path tableDir = new Path(args[0]); |
| HBaseConfiguration c = new HBaseConfiguration(); |
| c.set("fs.default.name", c.get(HConstants.HBASE_DIR)); |
| FileSystem fs = FileSystem.get(c); |
| String tmp = c.get("hbase.tmp.dir", "/tmp"); |
| Path logdir = new Path(new Path(tmp), |
| "hlog" + tableDir.getName() + System.currentTimeMillis()); |
| HLog log = new HLog(fs, logdir, c, null); |
| try { |
| processTable(fs, tableDir, log, c, majorCompact); |
| } finally { |
| log.close(); |
| } |
| } |
| } |