| /** |
| * Copyright 2010 The Apache Software Foundation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.regionserver; |
| |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.NavigableSet; |
| import java.util.SortedSet; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.FileUtil; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.HColumnDescriptor; |
| import org.apache.hadoop.hbase.HConstants; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.KeyValue; |
| import org.apache.hadoop.hbase.RemoteExceptionHandler; |
| import org.apache.hadoop.hbase.client.Scan; |
| import org.apache.hadoop.hbase.io.HeapSize; |
| import org.apache.hadoop.hbase.io.hfile.Compression; |
| import org.apache.hadoop.hbase.io.hfile.HFile; |
| import org.apache.hadoop.hbase.io.hfile.HFileScanner; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.ClassSize; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Iterables; |
| |
| /** |
| * A Store holds a column family in a Region. Its a memstore and a set of zero |
| * or more StoreFiles, which stretch backwards over time. |
| * |
| * <p>There's no reason to consider append-logging at this level; all logging |
| * and locking is handled at the HRegion level. Store just provides |
| * services to manage sets of StoreFiles. One of the most important of those |
| * services is compaction services where files are aggregated once they pass |
| * a configurable threshold. |
| * |
| * <p>The only thing having to do with logs that Store needs to deal with is |
| * the reconstructionLog. This is a segment of an HRegion's log that might |
| * NOT be present upon startup. If the param is NULL, there's nothing to do. |
| * If the param is non-NULL, we need to process the log to reconstruct |
| * a TreeMap that might not have been written to disk before the process |
| * died. |
| * |
| * <p>It's assumed that after this constructor returns, the reconstructionLog |
| * file will be deleted (by whoever has instantiated the Store). |
| * |
| * <p>Locking and transactions are handled at a higher level. This API should |
| * not be called directly but by an HRegion manager. |
| */ |
| public class Store implements HeapSize { |
| static final Log LOG = LogFactory.getLog(Store.class); |
| protected final MemStore memstore; |
| // This stores directory in the filesystem. |
| private final Path homedir; |
| private final HRegion region; |
| private final HColumnDescriptor family; |
| final FileSystem fs; |
| final Configuration conf; |
| // ttl in milliseconds. |
| protected long ttl; |
| private long majorCompactionTime; |
| private final int maxFilesToCompact; |
| private final long minCompactSize; |
| // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX |
| // With float, java will downcast your long to float for comparisons (bad) |
| private double compactRatio; |
| private long lastCompactSize = 0; |
| /* how many bytes to write between status checks */ |
| static int closeCheckInterval = 0; |
| private final long desiredMaxFileSize; |
| private final int blockingStoreFileCount; |
| private volatile long storeSize = 0L; |
| private final Object flushLock = new Object(); |
| final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| private final String storeNameStr; |
| private final boolean inMemory; |
| |
| /* |
| * List of store files inside this store. This is an immutable list that |
| * is atomically replaced when its contents change. |
| */ |
| private ImmutableList<StoreFile> storefiles = null; |
| |
| |
| // All access must be synchronized. |
| private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers = |
| new CopyOnWriteArraySet<ChangedReadersObserver>(); |
| |
| private final Object compactLock = new Object(); |
| private final int compactionThreshold; |
| private final int blocksize; |
| private final boolean blockcache; |
| /** Compression algorithm for flush files and minor compaction */ |
| private final Compression.Algorithm compression; |
| /** Compression algorithm for major compaction */ |
| private final Compression.Algorithm compactionCompression; |
| |
| // Comparing KeyValues |
| final KeyValue.KVComparator comparator; |
| |
| /** |
| * Constructor |
| * @param basedir qualified path under which the region directory lives; |
| * generally the table subdirectory |
| * @param region |
| * @param family HColumnDescriptor for this column |
| * @param fs file system object |
| * @param conf configuration object |
| * failed. Can be null. |
| * @throws IOException |
| */ |
| protected Store(Path basedir, HRegion region, HColumnDescriptor family, |
| FileSystem fs, Configuration conf) |
| throws IOException { |
| HRegionInfo info = region.regionInfo; |
| this.fs = fs; |
| this.homedir = getStoreHomedir(basedir, info.getEncodedName(), family.getName()); |
| if (!this.fs.exists(this.homedir)) { |
| if (!this.fs.mkdirs(this.homedir)) |
| throw new IOException("Failed create of: " + this.homedir.toString()); |
| } |
| this.region = region; |
| this.family = family; |
| this.conf = conf; |
| this.blockcache = family.isBlockCacheEnabled(); |
| this.blocksize = family.getBlocksize(); |
| this.compression = family.getCompression(); |
| // avoid overriding compression setting for major compactions if the user |
| // has not specified it separately |
| this.compactionCompression = |
| (family.getCompactionCompression() != Compression.Algorithm.NONE) ? |
| family.getCompactionCompression() : this.compression; |
| this.comparator = info.getComparator(); |
| // getTimeToLive returns ttl in seconds. Convert to milliseconds. |
| this.ttl = family.getTimeToLive(); |
| if (ttl == HConstants.FOREVER) { |
| // default is unlimited ttl. |
| ttl = Long.MAX_VALUE; |
| } else if (ttl == -1) { |
| ttl = Long.MAX_VALUE; |
| } else { |
| // second -> ms adjust for user data |
| this.ttl *= 1000; |
| } |
| this.memstore = new MemStore(this.comparator); |
| this.storeNameStr = Bytes.toString(this.family.getName()); |
| |
| // By default, we compact if an HStore has more than |
| // MIN_COMMITS_FOR_COMPACTION map files |
| this.compactionThreshold = Math.max(2, |
| conf.getInt("hbase.hstore.compactionThreshold", 3)); |
| |
| // Check if this is in-memory store |
| this.inMemory = family.isInMemory(); |
| |
| // By default we split region if a file > HConstants.DEFAULT_MAX_FILE_SIZE. |
| long maxFileSize = info.getTableDesc().getMaxFileSize(); |
| if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) { |
| maxFileSize = conf.getLong("hbase.hregion.max.filesize", |
| HConstants.DEFAULT_MAX_FILE_SIZE); |
| } |
| this.desiredMaxFileSize = maxFileSize; |
| this.blockingStoreFileCount = |
| conf.getInt("hbase.hstore.blockingStoreFiles", 7); |
| |
| this.majorCompactionTime = getNextMajorCompactTime(); |
| |
| this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10); |
| this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size", |
| this.region.memstoreFlushSize); |
| this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F); |
| |
| if (Store.closeCheckInterval == 0) { |
| Store.closeCheckInterval = conf.getInt( |
| "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */); |
| } |
| this.storefiles = sortAndClone(loadStoreFiles()); |
| } |
| |
| public HColumnDescriptor getFamily() { |
| return this.family; |
| } |
| |
| /** |
| * @return The maximum sequence id in all store files. |
| */ |
| long getMaxSequenceId() { |
| return StoreFile.getMaxSequenceIdInList(this.getStorefiles()); |
| } |
| |
| /** |
| * @param tabledir |
| * @param encodedName Encoded region name. |
| * @param family |
| * @return Path to family/Store home directory. |
| */ |
| public static Path getStoreHomedir(final Path tabledir, |
| final String encodedName, final byte [] family) { |
| return new Path(tabledir, new Path(encodedName, |
| new Path(Bytes.toString(family)))); |
| } |
| |
| /** |
| * Return the directory in which this store stores its |
| * StoreFiles |
| */ |
| public Path getHomedir() { |
| return homedir; |
| } |
| |
| /* |
| * Creates an unsorted list of StoreFile loaded from the given directory. |
| * @throws IOException |
| */ |
| private List<StoreFile> loadStoreFiles() |
| throws IOException { |
| ArrayList<StoreFile> results = new ArrayList<StoreFile>(); |
| FileStatus files[] = this.fs.listStatus(this.homedir); |
| for (int i = 0; files != null && i < files.length; i++) { |
| // Skip directories. |
| if (files[i].isDir()) { |
| continue; |
| } |
| Path p = files[i].getPath(); |
| // Check for empty file. Should never be the case but can happen |
| // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646 |
| if (this.fs.getFileStatus(p).getLen() <= 0) { |
| LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?"); |
| continue; |
| } |
| StoreFile curfile = null; |
| try { |
| curfile = new StoreFile(fs, p, blockcache, this.conf, |
| this.family.getBloomFilterType(), this.inMemory); |
| curfile.createReader(); |
| } catch (IOException ioe) { |
| LOG.warn("Failed open of " + p + "; presumption is that file was " + |
| "corrupted at flush and lost edits picked up by commit log replay. " + |
| "Verify!", ioe); |
| continue; |
| } |
| long length = curfile.getReader().length(); |
| this.storeSize += length; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("loaded " + curfile.toStringDetailed()); |
| } |
| results.add(curfile); |
| } |
| return results; |
| } |
| |
| /** |
| * Adds a value to the memstore |
| * |
| * @param kv |
| * @return memstore size delta |
| */ |
| protected long add(final KeyValue kv) { |
| lock.readLock().lock(); |
| try { |
| return this.memstore.add(kv); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Adds a value to the memstore |
| * |
| * @param kv |
| * @return memstore size delta |
| */ |
| protected long delete(final KeyValue kv) { |
| lock.readLock().lock(); |
| try { |
| return this.memstore.delete(kv); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * @return All store files. |
| */ |
| List<StoreFile> getStorefiles() { |
| return this.storefiles; |
| } |
| |
| public void bulkLoadHFile(String srcPathStr) throws IOException { |
| Path srcPath = new Path(srcPathStr); |
| |
| HFile.Reader reader = null; |
| try { |
| LOG.info("Validating hfile at " + srcPath + " for inclusion in " |
| + "store " + this + " region " + this.region); |
| reader = new HFile.Reader(srcPath.getFileSystem(conf), |
| srcPath, null, false); |
| reader.loadFileInfo(); |
| |
| byte[] firstKey = reader.getFirstRowKey(); |
| byte[] lk = reader.getLastKey(); |
| byte[] lastKey = |
| (lk == null) ? null : |
| KeyValue.createKeyValueFromKey(lk).getRow(); |
| |
| LOG.debug("HFile bounds: first=" + Bytes.toStringBinary(firstKey) + |
| " last=" + Bytes.toStringBinary(lastKey)); |
| LOG.debug("Region bounds: first=" + |
| Bytes.toStringBinary(region.getStartKey()) + |
| " last=" + Bytes.toStringBinary(region.getEndKey())); |
| |
| HRegionInfo hri = region.getRegionInfo(); |
| if (!hri.containsRange(firstKey, lastKey)) { |
| throw new WrongRegionException( |
| "Bulk load file " + srcPathStr + " does not fit inside region " |
| + this.region); |
| } |
| } finally { |
| if (reader != null) reader.close(); |
| } |
| |
| // Move the file if it's on another filesystem |
| FileSystem srcFs = srcPath.getFileSystem(conf); |
| if (!srcFs.equals(fs)) { |
| LOG.info("File " + srcPath + " on different filesystem than " + |
| "destination store - moving to this filesystem."); |
| Path tmpPath = getTmpPath(); |
| FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf); |
| LOG.info("Copied to temporary path on dst filesystem: " + tmpPath); |
| srcPath = tmpPath; |
| } |
| |
| Path dstPath = StoreFile.getRandomFilename(fs, homedir); |
| LOG.info("Renaming bulk load file " + srcPath + " to " + dstPath); |
| StoreFile.rename(fs, srcPath, dstPath); |
| |
| StoreFile sf = new StoreFile(fs, dstPath, blockcache, |
| this.conf, this.family.getBloomFilterType(), this.inMemory); |
| sf.createReader(); |
| |
| LOG.info("Moved hfile " + srcPath + " into store directory " + |
| homedir + " - updating store file list."); |
| |
| // Append the new storefile into the list |
| this.lock.writeLock().lock(); |
| try { |
| ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles); |
| newFiles.add(sf); |
| this.storefiles = sortAndClone(newFiles); |
| notifyChangedReadersObservers(); |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| LOG.info("Successfully loaded store file " + srcPath |
| + " into store " + this + " (new location: " + dstPath + ")"); |
| } |
| |
| /** |
| * Get a temporary path in this region. These temporary files |
| * will get cleaned up when the region is re-opened if they are |
| * still around. |
| */ |
| private Path getTmpPath() throws IOException { |
| return StoreFile.getRandomFilename( |
| fs, region.getTmpDir()); |
| } |
| |
| /** |
| * Close all the readers |
| * |
| * We don't need to worry about subsequent requests because the HRegion holds |
| * a write lock that will prevent any more reads or writes. |
| * |
| * @throws IOException |
| */ |
| ImmutableList<StoreFile> close() throws IOException { |
| this.lock.writeLock().lock(); |
| try { |
| ImmutableList<StoreFile> result = storefiles; |
| |
| // Clear so metrics doesn't find them. |
| storefiles = ImmutableList.of(); |
| |
| for (StoreFile f: result) { |
| f.closeReader(); |
| } |
| LOG.debug("closed " + this.storeNameStr); |
| return result; |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Snapshot this stores memstore. Call before running |
| * {@link #flushCache(long, SortedSet<KeyValue>)} so it has some work to do. |
| */ |
| void snapshot() { |
| this.memstore.snapshot(); |
| } |
| |
| /** |
| * Write out current snapshot. Presumes {@link #snapshot()} has been called |
| * previously. |
| * @param logCacheFlushId flush sequence number |
| * @param snapshot |
| * @param snapshotTimeRangeTracker |
| * @return true if a compaction is needed |
| * @throws IOException |
| */ |
| private StoreFile flushCache(final long logCacheFlushId, |
| SortedSet<KeyValue> snapshot, |
| TimeRangeTracker snapshotTimeRangeTracker) throws IOException { |
| // If an exception happens flushing, we let it out without clearing |
| // the memstore snapshot. The old snapshot will be returned when we say |
| // 'snapshot', the next time flush comes around. |
| return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker); |
| } |
| |
| /* |
| * @param cache |
| * @param logCacheFlushId |
| * @return StoreFile created. |
| * @throws IOException |
| */ |
| private StoreFile internalFlushCache(final SortedSet<KeyValue> set, |
| final long logCacheFlushId, |
| TimeRangeTracker snapshotTimeRangeTracker) |
| throws IOException { |
| StoreFile.Writer writer = null; |
| long flushed = 0; |
| // Don't flush if there are no entries. |
| if (set.size() == 0) { |
| return null; |
| } |
| long oldestTimestamp = System.currentTimeMillis() - ttl; |
| // TODO: We can fail in the below block before we complete adding this |
| // flush to list of store files. Add cleanup of anything put on filesystem |
| // if we fail. |
| synchronized (flushLock) { |
| // A. Write the map out to the disk |
| writer = createWriterInTmp(set.size()); |
| writer.setTimeRangeTracker(snapshotTimeRangeTracker); |
| int entries = 0; |
| try { |
| for (KeyValue kv: set) { |
| if (!isExpired(kv, oldestTimestamp)) { |
| writer.append(kv); |
| entries++; |
| flushed += this.memstore.heapSizeChange(kv, true); |
| } |
| } |
| } finally { |
| // Write out the log sequence number that corresponds to this output |
| // hfile. The hfile is current up to and including logCacheFlushId. |
| writer.appendMetadata(logCacheFlushId, false); |
| writer.close(); |
| } |
| } |
| |
| // Write-out finished successfully, move into the right spot |
| Path dstPath = StoreFile.getUniqueFile(fs, homedir); |
| LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); |
| if (!fs.rename(writer.getPath(), dstPath)) { |
| LOG.warn("Unable to rename " + writer.getPath() + " to " + dstPath); |
| } |
| |
| StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, |
| this.conf, this.family.getBloomFilterType(), this.inMemory); |
| StoreFile.Reader r = sf.createReader(); |
| this.storeSize += r.length(); |
| if(LOG.isInfoEnabled()) { |
| LOG.info("Added " + sf + ", entries=" + r.getEntries() + |
| ", sequenceid=" + logCacheFlushId + |
| ", memsize=" + StringUtils.humanReadableInt(flushed) + |
| ", filesize=" + StringUtils.humanReadableInt(r.length())); |
| } |
| return sf; |
| } |
| |
| /* |
| * @param maxKeyCount |
| * @return Writer for a new StoreFile in the tmp dir. |
| */ |
| private StoreFile.Writer createWriterInTmp(int maxKeyCount) |
| throws IOException { |
| return createWriterInTmp(maxKeyCount, this.compression); |
| } |
| |
| /* |
| * @param maxKeyCount |
| * @param compression Compression algorithm to use |
| * @return Writer for a new StoreFile in the tmp dir. |
| */ |
| private StoreFile.Writer createWriterInTmp(int maxKeyCount, |
| Compression.Algorithm compression) |
| throws IOException { |
| return StoreFile.createWriter(this.fs, region.getTmpDir(), this.blocksize, |
| compression, this.comparator, this.conf, |
| this.family.getBloomFilterType(), maxKeyCount); |
| } |
| |
| /* |
| * Change storefiles adding into place the Reader produced by this new flush. |
| * @param sf |
| * @param set That was used to make the passed file <code>p</code>. |
| * @throws IOException |
| * @return Whether compaction is required. |
| */ |
| private boolean updateStorefiles(final StoreFile sf, |
| final SortedSet<KeyValue> set) |
| throws IOException { |
| this.lock.writeLock().lock(); |
| try { |
| ArrayList<StoreFile> newList = new ArrayList<StoreFile>(storefiles); |
| newList.add(sf); |
| storefiles = sortAndClone(newList); |
| this.memstore.clearSnapshot(set); |
| |
| // Tell listeners of the change in readers. |
| notifyChangedReadersObservers(); |
| |
| return this.storefiles.size() >= this.compactionThreshold; |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| } |
| |
| /* |
| * Notify all observers that set of Readers has changed. |
| * @throws IOException |
| */ |
| private void notifyChangedReadersObservers() throws IOException { |
| for (ChangedReadersObserver o: this.changedReaderObservers) { |
| o.updateReaders(); |
| } |
| } |
| |
| /* |
| * @param o Observer who wants to know about changes in set of Readers |
| */ |
| void addChangedReaderObserver(ChangedReadersObserver o) { |
| this.changedReaderObservers.add(o); |
| } |
| |
| /* |
| * @param o Observer no longer interested in changes in set of Readers. |
| */ |
| void deleteChangedReaderObserver(ChangedReadersObserver o) { |
| // We don't check if observer present; it may not be (legitimately) |
| this.changedReaderObservers.remove(o); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // Compaction |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Compact the StoreFiles. This method may take some time, so the calling |
| * thread must be able to block for long periods. |
| * |
| * <p>During this time, the Store can work as usual, getting values from |
| * StoreFiles and writing new StoreFiles from the memstore. |
| * |
| * Existing StoreFiles are not destroyed until the new compacted StoreFile is |
| * completely written-out to disk. |
| * |
| * <p>The compactLock prevents multiple simultaneous compactions. |
| * The structureLock prevents us from interfering with other write operations. |
| * |
| * <p>We don't want to hold the structureLock for the whole time, as a compact() |
| * can be lengthy and we want to allow cache-flushes during this period. |
| * |
| * @param forceMajor True to force a major compaction regardless of thresholds |
| * @return row to split around if a split is needed, null otherwise |
| * @throws IOException |
| */ |
| StoreSize compact(final boolean forceMajor) throws IOException { |
| boolean forceSplit = this.region.shouldSplit(false); |
| boolean majorcompaction = forceMajor; |
| synchronized (compactLock) { |
| this.lastCompactSize = 0; |
| |
| // filesToCompact are sorted oldest to newest. |
| List<StoreFile> filesToCompact = this.storefiles; |
| if (filesToCompact.isEmpty()) { |
| LOG.debug(this.storeNameStr + ": no store files to compact"); |
| return null; |
| } |
| |
| // Check to see if we need to do a major compaction on this region. |
| // If so, change doMajorCompaction to true to skip the incremental |
| // compacting below. Only check if doMajorCompaction is not true. |
| if (!majorcompaction) { |
| majorcompaction = isMajorCompaction(filesToCompact); |
| } |
| |
| boolean references = hasReferences(filesToCompact); |
| if (!majorcompaction && !references && |
| (forceSplit || (filesToCompact.size() < compactionThreshold))) { |
| return checkSplit(forceSplit); |
| } |
| |
| /* get store file sizes for incremental compacting selection. |
| * normal skew: |
| * |
| * older ----> newer |
| * _ |
| * | | _ |
| * | | | | _ |
| * --|-|- |-|- |-|---_-------_------- minCompactSize |
| * | | | | | | | | _ | | |
| * | | | | | | | | | | | | |
| * | | | | | | | | | | | | |
| */ |
| int countOfFiles = filesToCompact.size(); |
| long [] fileSizes = new long[countOfFiles]; |
| long [] sumSize = new long[countOfFiles]; |
| for (int i = countOfFiles-1; i >= 0; --i) { |
| StoreFile file = filesToCompact.get(i); |
| Path path = file.getPath(); |
| if (path == null) { |
| LOG.error("Path is null for " + file); |
| return null; |
| } |
| StoreFile.Reader r = file.getReader(); |
| if (r == null) { |
| LOG.error("StoreFile " + file + " has a null Reader"); |
| return null; |
| } |
| fileSizes[i] = file.getReader().length(); |
| // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo |
| int tooFar = i + this.maxFilesToCompact - 1; |
| sumSize[i] = fileSizes[i] |
| + ((i+1 < countOfFiles) ? sumSize[i+1] : 0) |
| - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0); |
| } |
| |
| long totalSize = 0; |
| if (!majorcompaction && !references) { |
| // we're doing a minor compaction, let's see what files are applicable |
| int start = 0; |
| double r = this.compactRatio; |
| |
| /* Start at the oldest file and stop when you find the first file that |
| * meets compaction criteria: |
| * (1) a recently-flushed, small file (i.e. <= minCompactSize) |
| * OR |
| * (2) within the compactRatio of sum(newer_files) |
| * Given normal skew, any newer files will also meet this criteria |
| * |
| * Additional Note: |
| * If fileSizes.size() >> maxFilesToCompact, we will recurse on |
| * compact(). Consider the oldest files first to avoid a |
| * situation where we always compact [end-threshold,end). Then, the |
| * last file becomes an aggregate of the previous compactions. |
| */ |
| while(countOfFiles - start >= this.compactionThreshold && |
| fileSizes[start] > |
| Math.max(minCompactSize, (long)(sumSize[start+1] * r))) { |
| ++start; |
| } |
| int end = Math.min(countOfFiles, start + this.maxFilesToCompact); |
| totalSize = fileSizes[start] |
| + ((start+1 < countOfFiles) ? sumSize[start+1] : 0); |
| |
| // if we don't have enough files to compact, just wait |
| if (end - start < this.compactionThreshold) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Skipped compaction of " + this.storeNameStr |
| + " because only " + (end - start) + " file(s) of size " |
| + StringUtils.humanReadableInt(totalSize) |
| + " meet compaction criteria."); |
| } |
| return checkSplit(forceSplit); |
| } |
| |
| if (0 == start && end == countOfFiles) { |
| // we decided all the files were candidates! major compact |
| majorcompaction = true; |
| } else { |
| filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(start, |
| end)); |
| } |
| } else { |
| // all files included in this compaction |
| for (long i : fileSizes) { |
| totalSize += i; |
| } |
| } |
| this.lastCompactSize = totalSize; |
| |
| // Max-sequenceID is the last key in the files we're compacting |
| long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); |
| |
| // Ready to go. Have list of files to compact. |
| LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" + |
| this.storeNameStr + |
| (references? ", hasReferences=true,": " ") + " into " + |
| region.getTmpDir() + ", seqid=" + maxId + |
| ", totalSize=" + StringUtils.humanReadableInt(totalSize)); |
| StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); |
| // Move the compaction into place. |
| StoreFile sf = completeCompaction(filesToCompact, writer); |
| if (LOG.isInfoEnabled()) { |
| LOG.info("Completed" + (majorcompaction? " major ": " ") + |
| "compaction of " + filesToCompact.size() + |
| " file(s), new file=" + (sf == null? "none": sf.toString()) + |
| ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) + |
| "; total size for store is " + StringUtils.humanReadableInt(storeSize)); |
| } |
| } |
| return checkSplit(forceSplit); |
| } |
| |
| /* |
| * Compact the most recent N files. Essentially a hook for testing. |
| */ |
| protected void compactRecent(int N) throws IOException { |
| synchronized(compactLock) { |
| List<StoreFile> filesToCompact = this.storefiles; |
| int count = filesToCompact.size(); |
| if (N > count) { |
| throw new RuntimeException("Not enough files"); |
| } |
| |
| filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count)); |
| long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact); |
| boolean majorcompaction = (N == count); |
| |
| // Ready to go. Have list of files to compact. |
| StoreFile.Writer writer = compact(filesToCompact, majorcompaction, maxId); |
| // Move the compaction into place. |
| StoreFile sf = completeCompaction(filesToCompact, writer); |
| } |
| } |
| |
| /* |
| * @param files |
| * @return True if any of the files in <code>files</code> are References. |
| */ |
| private boolean hasReferences(Collection<StoreFile> files) { |
| if (files != null && files.size() > 0) { |
| for (StoreFile hsf: files) { |
| if (hsf.isReference()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| /* |
| * Gets lowest timestamp from files in a dir |
| * |
| * @param fs |
| * @param dir |
| * @throws IOException |
| */ |
| private static long getLowestTimestamp(FileSystem fs, Path dir) throws IOException { |
| FileStatus[] stats = fs.listStatus(dir); |
| if (stats == null || stats.length == 0) { |
| return 0l; |
| } |
| long lowTimestamp = Long.MAX_VALUE; |
| for (int i = 0; i < stats.length; i++) { |
| long timestamp = stats[i].getModificationTime(); |
| if (timestamp < lowTimestamp){ |
| lowTimestamp = timestamp; |
| } |
| } |
| return lowTimestamp; |
| } |
| |
| /* |
| * @return True if we should run a major compaction. |
| */ |
| boolean isMajorCompaction() throws IOException { |
| return isMajorCompaction(storefiles); |
| } |
| |
| /* |
| * @param filesToCompact Files to compact. Can be null. |
| * @return True if we should run a major compaction. |
| */ |
| private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException { |
| boolean result = false; |
| if (filesToCompact == null || filesToCompact.isEmpty() || |
| majorCompactionTime == 0) { |
| return result; |
| } |
| // TODO: Use better method for determining stamp of last major (HBASE-2990) |
| long lowTimestamp = getLowestTimestamp(fs, |
| filesToCompact.get(0).getPath().getParent()); |
| long now = System.currentTimeMillis(); |
| if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) { |
| // Major compaction time has elapsed. |
| if (filesToCompact.size() == 1) { |
| // Single file |
| StoreFile sf = filesToCompact.get(0); |
| long oldest = now - sf.getReader().timeRangeTracker.minimumTimestamp; |
| if (sf.isMajorCompaction() && |
| (this.ttl == HConstants.FOREVER || oldest < this.ttl)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Skipping major compaction of " + this.storeNameStr + |
| " because one (major) compacted file only and oldestTime " + |
| oldest + "ms is < ttl=" + this.ttl); |
| } |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Major compaction triggered on store " + this.storeNameStr + |
| "; time since last major compaction " + (now - lowTimestamp) + "ms"); |
| } |
| result = true; |
| this.majorCompactionTime = getNextMajorCompactTime(); |
| } |
| } |
| return result; |
| } |
| |
| long getNextMajorCompactTime() { |
| // default = 24hrs |
| long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24); |
| if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) { |
| String strCompactionTime = |
| family.getValue(HConstants.MAJOR_COMPACTION_PERIOD); |
| ret = (new Long(strCompactionTime)).longValue(); |
| } |
| |
| if (ret > 0) { |
| // default = 20% = +/- 4.8 hrs |
| double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter", |
| 0.20F); |
| if (jitterPct > 0) { |
| long jitter = Math.round(ret * jitterPct); |
| ret += jitter - Math.round(2L * jitter * Math.random()); |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Do a minor/major compaction. Uses the scan infrastructure to make it easy. |
| * |
| * @param filesToCompact which files to compact |
| * @param majorCompaction true to major compact (prune all deletes, max versions, etc) |
| * @param maxId Readers maximum sequence id. |
| * @return Product of compaction or null if all cells expired or deleted and |
| * nothing made it through the compaction. |
| * @throws IOException |
| */ |
| private StoreFile.Writer compact(final List<StoreFile> filesToCompact, |
| final boolean majorCompaction, final long maxId) |
| throws IOException { |
| // calculate maximum key count after compaction (for blooms) |
| int maxKeyCount = 0; |
| for (StoreFile file : filesToCompact) { |
| StoreFile.Reader r = file.getReader(); |
| if (r != null) { |
| // NOTE: getFilterEntries could cause under-sized blooms if the user |
| // switches bloom type (e.g. from ROW to ROWCOL) |
| long keyCount = (r.getBloomFilterType() == family.getBloomFilterType()) |
| ? r.getFilterEntries() : r.getEntries(); |
| maxKeyCount += keyCount; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Compacting " + file + |
| ", keycount=" + keyCount + |
| ", bloomtype=" + r.getBloomFilterType().toString() + |
| ", size=" + StringUtils.humanReadableInt(r.length()) ); |
| } |
| } |
| } |
| |
| // For each file, obtain a scanner: |
| List<StoreFileScanner> scanners = StoreFileScanner |
| .getScannersForStoreFiles(filesToCompact, false, false); |
| |
| // Make the instantiation lazy in case compaction produces no product; i.e. |
| // where all source cells are expired or deleted. |
| StoreFile.Writer writer = null; |
| try { |
| InternalScanner scanner = null; |
| try { |
| Scan scan = new Scan(); |
| scan.setMaxVersions(family.getMaxVersions()); |
| /* include deletes, unless we are doing a major compaction */ |
| scanner = new StoreScanner(this, scan, scanners, !majorCompaction); |
| int bytesWritten = 0; |
| // since scanner.next() can return 'false' but still be delivering data, |
| // we have to use a do/while loop. |
| ArrayList<KeyValue> kvs = new ArrayList<KeyValue>(); |
| while (scanner.next(kvs)) { |
| if (writer == null && !kvs.isEmpty()) { |
| writer = createWriterInTmp(maxKeyCount, |
| this.compactionCompression); |
| } |
| if (writer != null) { |
| // output to writer: |
| for (KeyValue kv : kvs) { |
| writer.append(kv); |
| |
| // check periodically to see if a system stop is requested |
| if (Store.closeCheckInterval > 0) { |
| bytesWritten += kv.getLength(); |
| if (bytesWritten > Store.closeCheckInterval) { |
| bytesWritten = 0; |
| if (!this.region.areWritesEnabled()) { |
| writer.close(); |
| fs.delete(writer.getPath(), false); |
| throw new InterruptedIOException( |
| "Aborting compaction of store " + this + |
| " in region " + this.region + |
| " because user requested stop."); |
| } |
| } |
| } |
| } |
| } |
| kvs.clear(); |
| } |
| } finally { |
| if (scanner != null) { |
| scanner.close(); |
| } |
| } |
| } finally { |
| if (writer != null) { |
| writer.appendMetadata(maxId, majorCompaction); |
| writer.close(); |
| } |
| } |
| return writer; |
| } |
| |
| /* |
| * It's assumed that the compactLock will be acquired prior to calling this |
| * method! Otherwise, it is not thread-safe! |
| * |
| * <p>It works by processing a compaction that's been written to disk. |
| * |
| * <p>It is usually invoked at the end of a compaction, but might also be |
| * invoked at HStore startup, if the prior execution died midway through. |
| * |
| * <p>Moving the compacted TreeMap into place means: |
| * <pre> |
| * 1) Moving the new compacted StoreFile into place |
| * 2) Unload all replaced StoreFile, close and collect list to delete. |
| * 3) Loading the new TreeMap. |
| * 4) Compute new store size |
| * </pre> |
| * |
| * @param compactedFiles list of files that were compacted |
| * @param compactedFile StoreFile that is the result of the compaction |
| * @return StoreFile created. May be null. |
| * @throws IOException |
| */ |
| private StoreFile completeCompaction(final List<StoreFile> compactedFiles, |
| final StoreFile.Writer compactedFile) |
| throws IOException { |
| // 1. Moving the new files into place -- if there is a new file (may not |
| // be if all cells were expired or deleted). |
| StoreFile result = null; |
| if (compactedFile != null) { |
| Path p = null; |
| try { |
| p = StoreFile.rename(this.fs, compactedFile.getPath(), |
| StoreFile.getRandomFilename(fs, this.homedir)); |
| } catch (IOException e) { |
| LOG.error("Failed move of compacted file " + compactedFile.getPath(), e); |
| return null; |
| } |
| result = new StoreFile(this.fs, p, blockcache, this.conf, |
| this.family.getBloomFilterType(), this.inMemory); |
| result.createReader(); |
| } |
| this.lock.writeLock().lock(); |
| try { |
| try { |
| // 2. Unloading |
| // 3. Loading the new TreeMap. |
| // Change this.storefiles so it reflects new state but do not |
| // delete old store files until we have sent out notification of |
| // change in case old files are still being accessed by outstanding |
| // scanners. |
| ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>(); |
| for (StoreFile sf : storefiles) { |
| if (!compactedFiles.contains(sf)) { |
| newStoreFiles.add(sf); |
| } |
| } |
| |
| // If a StoreFile result, move it into place. May be null. |
| if (result != null) { |
| newStoreFiles.add(result); |
| } |
| |
| this.storefiles = sortAndClone(newStoreFiles); |
| |
| // Tell observers that list of StoreFiles has changed. |
| notifyChangedReadersObservers(); |
| // Finally, delete old store files. |
| for (StoreFile hsf: compactedFiles) { |
| hsf.deleteReader(); |
| } |
| } catch (IOException e) { |
| e = RemoteExceptionHandler.checkIOException(e); |
| LOG.error("Failed replacing compacted files in " + this.storeNameStr + |
| ". Compacted file is " + (result == null? "none": result.toString()) + |
| ". Files replaced " + compactedFiles.toString() + |
| " some of which may have been already removed", e); |
| } |
| // 4. Compute new store size |
| this.storeSize = 0L; |
| for (StoreFile hsf : this.storefiles) { |
| StoreFile.Reader r = hsf.getReader(); |
| if (r == null) { |
| LOG.warn("StoreFile " + hsf + " has a null Reader"); |
| continue; |
| } |
| this.storeSize += r.length(); |
| } |
| } finally { |
| this.lock.writeLock().unlock(); |
| } |
| return result; |
| } |
| |
| public ImmutableList<StoreFile> sortAndClone(List<StoreFile> storeFiles) { |
| Collections.sort(storeFiles, StoreFile.Comparators.FLUSH_TIME); |
| ImmutableList<StoreFile> newList = ImmutableList.copyOf(storeFiles); |
| return newList; |
| } |
| |
| // //////////////////////////////////////////////////////////////////////////// |
| // Accessors. |
| // (This is the only section that is directly useful!) |
| ////////////////////////////////////////////////////////////////////////////// |
| /** |
| * @return the number of files in this store |
| */ |
| public int getNumberOfstorefiles() { |
| return this.storefiles.size(); |
| } |
| |
| /* |
| * @param wantedVersions How many versions were asked for. |
| * @return wantedVersions or this families' {@link HConstants#VERSIONS}. |
| */ |
| int versionsToReturn(final int wantedVersions) { |
| if (wantedVersions <= 0) { |
| throw new IllegalArgumentException("Number of versions must be > 0"); |
| } |
| // Make sure we do not return more than maximum versions for this store. |
| int maxVersions = this.family.getMaxVersions(); |
| return wantedVersions > maxVersions ? maxVersions: wantedVersions; |
| } |
| |
| static boolean isExpired(final KeyValue key, final long oldestTimestamp) { |
| return key.getTimestamp() < oldestTimestamp; |
| } |
| |
| /** |
| * Find the key that matches <i>row</i> exactly, or the one that immediately |
| * preceeds it. WARNING: Only use this method on a table where writes occur |
| * with strictly increasing timestamps. This method assumes this pattern of |
| * writes in order to make it reasonably performant. Also our search is |
| * dependent on the axiom that deletes are for cells that are in the container |
| * that follows whether a memstore snapshot or a storefile, not for the |
| * current container: i.e. we'll see deletes before we come across cells we |
| * are to delete. Presumption is that the memstore#kvset is processed before |
| * memstore#snapshot and so on. |
| * @param kv First possible item on targeted row; i.e. empty columns, latest |
| * timestamp and maximum type. |
| * @return Found keyvalue or null if none found. |
| * @throws IOException |
| */ |
| KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException { |
| GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker( |
| this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion()); |
| this.lock.readLock().lock(); |
| try { |
| // First go to the memstore. Pick up deletes and candidates. |
| this.memstore.getRowKeyAtOrBefore(state); |
| // Check if match, if we got a candidate on the asked for 'kv' row. |
| // Process each store file. Run through from newest to oldest. |
| for (StoreFile sf : Iterables.reverse(storefiles)) { |
| // Update the candidate keys from the current map file |
| rowAtOrBeforeFromStoreFile(sf, state); |
| } |
| return state.getCandidate(); |
| } finally { |
| this.lock.readLock().unlock(); |
| } |
| } |
| |
| /* |
| * Check an individual MapFile for the row at or before a given row. |
| * @param f |
| * @param state |
| * @throws IOException |
| */ |
| private void rowAtOrBeforeFromStoreFile(final StoreFile f, |
| final GetClosestRowBeforeTracker state) |
| throws IOException { |
| StoreFile.Reader r = f.getReader(); |
| if (r == null) { |
| LOG.warn("StoreFile " + f + " has a null Reader"); |
| return; |
| } |
| // TODO: Cache these keys rather than make each time? |
| byte [] fk = r.getFirstKey(); |
| KeyValue firstKV = KeyValue.createKeyValueFromKey(fk, 0, fk.length); |
| byte [] lk = r.getLastKey(); |
| KeyValue lastKV = KeyValue.createKeyValueFromKey(lk, 0, lk.length); |
| KeyValue firstOnRow = state.getTargetKey(); |
| if (this.comparator.compareRows(lastKV, firstOnRow) < 0) { |
| // If last key in file is not of the target table, no candidates in this |
| // file. Return. |
| if (!state.isTargetTable(lastKV)) return; |
| // If the row we're looking for is past the end of file, set search key to |
| // last key. TODO: Cache last and first key rather than make each time. |
| firstOnRow = new KeyValue(lastKV.getRow(), HConstants.LATEST_TIMESTAMP); |
| } |
| // Get a scanner that caches blocks and that uses pread. |
| HFileScanner scanner = r.getScanner(true, true); |
| // Seek scanner. If can't seek it, return. |
| if (!seekToScanner(scanner, firstOnRow, firstKV)) return; |
| // If we found candidate on firstOnRow, just return. THIS WILL NEVER HAPPEN! |
| // Unlikely that there'll be an instance of actual first row in table. |
| if (walkForwardInSingleRow(scanner, firstOnRow, state)) return; |
| // If here, need to start backing up. |
| while (scanner.seekBefore(firstOnRow.getBuffer(), firstOnRow.getKeyOffset(), |
| firstOnRow.getKeyLength())) { |
| KeyValue kv = scanner.getKeyValue(); |
| if (!state.isTargetTable(kv)) break; |
| if (!state.isBetterCandidate(kv)) break; |
| // Make new first on row. |
| firstOnRow = new KeyValue(kv.getRow(), HConstants.LATEST_TIMESTAMP); |
| // Seek scanner. If can't seek it, break. |
| if (!seekToScanner(scanner, firstOnRow, firstKV)) break; |
| // If we find something, break; |
| if (walkForwardInSingleRow(scanner, firstOnRow, state)) break; |
| } |
| } |
| |
| /* |
| * Seek the file scanner to firstOnRow or first entry in file. |
| * @param scanner |
| * @param firstOnRow |
| * @param firstKV |
| * @return True if we successfully seeked scanner. |
| * @throws IOException |
| */ |
| private boolean seekToScanner(final HFileScanner scanner, |
| final KeyValue firstOnRow, |
| final KeyValue firstKV) |
| throws IOException { |
| KeyValue kv = firstOnRow; |
| // If firstOnRow < firstKV, set to firstKV |
| if (this.comparator.compareRows(firstKV, firstOnRow) == 0) kv = firstKV; |
| int result = scanner.seekTo(kv.getBuffer(), kv.getKeyOffset(), |
| kv.getKeyLength()); |
| return result >= 0; |
| } |
| |
| /* |
| * When we come in here, we are probably at the kv just before we break into |
| * the row that firstOnRow is on. Usually need to increment one time to get |
| * on to the row we are interested in. |
| * @param scanner |
| * @param firstOnRow |
| * @param state |
| * @return True we found a candidate. |
| * @throws IOException |
| */ |
| private boolean walkForwardInSingleRow(final HFileScanner scanner, |
| final KeyValue firstOnRow, |
| final GetClosestRowBeforeTracker state) |
| throws IOException { |
| boolean foundCandidate = false; |
| do { |
| KeyValue kv = scanner.getKeyValue(); |
| // If we are not in the row, skip. |
| if (this.comparator.compareRows(kv, firstOnRow) < 0) continue; |
| // Did we go beyond the target row? If so break. |
| if (state.isTooFar(kv, firstOnRow)) break; |
| if (state.isExpired(kv)) { |
| continue; |
| } |
| // If we added something, this row is a contender. break. |
| if (state.handle(kv)) { |
| foundCandidate = true; |
| break; |
| } |
| } while(scanner.next()); |
| return foundCandidate; |
| } |
| |
| /** |
| * Determines if HStore can be split |
| * @param force Whether to force a split or not. |
| * @return a StoreSize if store can be split, null otherwise. |
| */ |
| StoreSize checkSplit(final boolean force) { |
| this.lock.readLock().lock(); |
| try { |
| // Iterate through all store files |
| if (this.storefiles.isEmpty()) { |
| return null; |
| } |
| if (!force && (storeSize < this.desiredMaxFileSize)) { |
| return null; |
| } |
| |
| if (this.region.getRegionInfo().isMetaRegion()) { |
| if (force) { |
| LOG.warn("Cannot split meta regions in HBase 0.20"); |
| } |
| return null; |
| } |
| |
| // Not splitable if we find a reference store file present in the store. |
| boolean splitable = true; |
| long maxSize = 0L; |
| StoreFile largestSf = null; |
| for (StoreFile sf : storefiles) { |
| if (splitable) { |
| splitable = !sf.isReference(); |
| if (!splitable) { |
| // RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return. |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(sf + " is not splittable"); |
| } |
| return null; |
| } |
| } |
| StoreFile.Reader r = sf.getReader(); |
| if (r == null) { |
| LOG.warn("Storefile " + sf + " Reader is null"); |
| continue; |
| } |
| long size = r.length(); |
| if (size > maxSize) { |
| // This is the largest one so far |
| maxSize = size; |
| largestSf = sf; |
| } |
| } |
| StoreFile.Reader r = largestSf.getReader(); |
| if (r == null) { |
| LOG.warn("Storefile " + largestSf + " Reader is null"); |
| return null; |
| } |
| // Get first, last, and mid keys. Midkey is the key that starts block |
| // in middle of hfile. Has column and timestamp. Need to return just |
| // the row we want to split on as midkey. |
| byte [] midkey = r.midkey(); |
| if (midkey != null) { |
| KeyValue mk = KeyValue.createKeyValueFromKey(midkey, 0, midkey.length); |
| byte [] fk = r.getFirstKey(); |
| KeyValue firstKey = KeyValue.createKeyValueFromKey(fk, 0, fk.length); |
| byte [] lk = r.getLastKey(); |
| KeyValue lastKey = KeyValue.createKeyValueFromKey(lk, 0, lk.length); |
| // if the midkey is the same as the first and last keys, then we cannot |
| // (ever) split this region. |
| if (this.comparator.compareRows(mk, firstKey) == 0 && |
| this.comparator.compareRows(mk, lastKey) == 0) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("cannot split because midkey is the same as first or " + |
| "last row"); |
| } |
| return null; |
| } |
| return new StoreSize(maxSize, mk.getRow()); |
| } |
| } catch(IOException e) { |
| LOG.warn("Failed getting store size for " + this.storeNameStr, e); |
| } finally { |
| this.lock.readLock().unlock(); |
| } |
| return null; |
| } |
| |
| /** @return aggregate size of all HStores used in the last compaction */ |
| public long getLastCompactSize() { |
| return this.lastCompactSize; |
| } |
| |
| /** @return aggregate size of HStore */ |
| public long getSize() { |
| return storeSize; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // File administration |
| ////////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * Return a scanner for both the memstore and the HStore files |
| * @throws IOException |
| */ |
| public KeyValueScanner getScanner(Scan scan, |
| final NavigableSet<byte []> targetCols) throws IOException { |
| lock.readLock().lock(); |
| try { |
| return new StoreScanner(this, scan, targetCols); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return this.storeNameStr; |
| } |
| |
| /** |
| * @return Count of store files |
| */ |
| int getStorefilesCount() { |
| return this.storefiles.size(); |
| } |
| |
| /** |
| * @return The size of the store files, in bytes. |
| */ |
| long getStorefilesSize() { |
| long size = 0; |
| for (StoreFile s: storefiles) { |
| StoreFile.Reader r = s.getReader(); |
| if (r == null) { |
| LOG.warn("StoreFile " + s + " has a null Reader"); |
| continue; |
| } |
| size += r.length(); |
| } |
| return size; |
| } |
| |
| /** |
| * @return The size of the store file indexes, in bytes. |
| */ |
| long getStorefilesIndexSize() { |
| long size = 0; |
| for (StoreFile s: storefiles) { |
| StoreFile.Reader r = s.getReader(); |
| if (r == null) { |
| LOG.warn("StoreFile " + s + " has a null Reader"); |
| continue; |
| } |
| size += r.indexSize(); |
| } |
| return size; |
| } |
| |
| /** |
| * @return The priority that this store should have in the compaction queue |
| */ |
| int getCompactPriority() { |
| return this.blockingStoreFileCount - this.storefiles.size(); |
| } |
| |
| /** |
| * Datastructure that holds size and row to split a file around. |
| * TODO: Take a KeyValue rather than row. |
| */ |
| static class StoreSize { |
| private final long size; |
| private final byte [] row; |
| |
| StoreSize(long size, byte [] row) { |
| this.size = size; |
| this.row = row; |
| } |
| /* @return the size */ |
| long getSize() { |
| return size; |
| } |
| |
| byte [] getSplitRow() { |
| return this.row; |
| } |
| } |
| |
| HRegion getHRegion() { |
| return this.region; |
| } |
| |
| HRegionInfo getHRegionInfo() { |
| return this.region.regionInfo; |
| } |
| |
| /** |
| * Increments the value for the given row/family/qualifier. |
| * |
| * This function will always be seen as atomic by other readers |
| * because it only puts a single KV to memstore. Thus no |
| * read/write control necessary. |
| * |
| * @param row |
| * @param f |
| * @param qualifier |
| * @param newValue the new value to set into memstore |
| * @return memstore size delta |
| * @throws IOException |
| */ |
| public long updateColumnValue(byte [] row, byte [] f, |
| byte [] qualifier, long newValue) |
| throws IOException { |
| |
| this.lock.readLock().lock(); |
| try { |
| long now = EnvironmentEdgeManager.currentTimeMillis(); |
| |
| return this.memstore.updateColumnValue(row, |
| f, |
| qualifier, |
| newValue, |
| now); |
| |
| } finally { |
| this.lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Adds or replaces the specified KeyValues. |
| * <p> |
| * For each KeyValue specified, if a cell with the same row, family, and |
| * qualifier exists in MemStore, it will be replaced. Otherwise, it will just |
| * be inserted to MemStore. |
| * <p> |
| * This operation is atomic on each KeyValue (row/family/qualifier) but not |
| * necessarily atomic across all of them. |
| * @param kvs |
| * @return memstore size delta |
| * @throws IOException |
| */ |
| public long upsert(List<KeyValue> kvs) |
| throws IOException { |
| this.lock.readLock().lock(); |
| try { |
| // TODO: Make this operation atomic w/ RWCC |
| return this.memstore.upsert(kvs); |
| } finally { |
| this.lock.readLock().unlock(); |
| } |
| } |
| |
| public StoreFlusher getStoreFlusher(long cacheFlushId) { |
| return new StoreFlusherImpl(cacheFlushId); |
| } |
| |
| private class StoreFlusherImpl implements StoreFlusher { |
| |
| private long cacheFlushId; |
| private SortedSet<KeyValue> snapshot; |
| private StoreFile storeFile; |
| private TimeRangeTracker snapshotTimeRangeTracker; |
| |
| private StoreFlusherImpl(long cacheFlushId) { |
| this.cacheFlushId = cacheFlushId; |
| } |
| |
| @Override |
| public void prepare() { |
| memstore.snapshot(); |
| this.snapshot = memstore.getSnapshot(); |
| this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); |
| } |
| |
| @Override |
| public void flushCache() throws IOException { |
| storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker); |
| } |
| |
| @Override |
| public boolean commit() throws IOException { |
| if (storeFile == null) { |
| return false; |
| } |
| // Add new file to store files. Clear snapshot too while we have |
| // the Store write lock. |
| return Store.this.updateStorefiles(storeFile, snapshot); |
| } |
| } |
| |
| /** |
| * See if there's too much store files in this store |
| * @return true if number of store files is greater than |
| * the number defined in compactionThreshold |
| */ |
| public boolean hasTooManyStoreFiles() { |
| return this.storefiles.size() > this.compactionThreshold; |
| } |
| |
| public static final long FIXED_OVERHEAD = ClassSize.align( |
| ClassSize.OBJECT + (15 * ClassSize.REFERENCE) + |
| (6 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) + |
| (4 * Bytes.SIZEOF_INT) + (Bytes.SIZEOF_BOOLEAN * 2)); |
| |
| public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + |
| ClassSize.OBJECT + ClassSize.REENTRANT_LOCK + |
| ClassSize.CONCURRENT_SKIPLISTMAP + |
| ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT); |
| |
| @Override |
| public long heapSize() { |
| return DEEP_OVERHEAD + this.memstore.heapSize(); |
| } |
| } |