blob: a9e86b32847406524a033fd92d62f45bb27d227d [file] [log] [blame]
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BloomFilterMapFile;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HBaseMapFile;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.io.MapFile;
import org.apache.hadoop.hbase.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
/**
* HStore maintains a bunch of data files. It is responsible for maintaining
* the memory/file hierarchy and for periodic flushes to disk and compacting
* edits to the file.
*
* Locking and transactions are handled at a higher level. This API should not
* be called directly by any writer, but rather by an HRegion manager.
*/
public class HStore implements HConstants {
static final Log LOG = LogFactory.getLog(HStore.class);
/*
* Regex that will work for straight filenames and for reference names.
* If reference, then the regex has more than just one group. Group 1 is
* this files id. Group 2 the referenced region name, etc.
*/
private static final Pattern REF_NAME_PARSER =
Pattern.compile("^(\\d+)(?:\\.(.+))?$");
protected final Memcache memcache;
private final Path basedir;
private final HRegionInfo info;
private final HColumnDescriptor family;
private final SequenceFile.CompressionType compression;
final FileSystem fs;
private final HBaseConfiguration conf;
// ttl in milliseconds.
protected long ttl;
private long majorCompactionTime;
private int maxFilesToCompact;
private final long desiredMaxFileSize;
private volatile long storeSize;
private final Integer flushLock = new Integer(0);
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
final byte [] storeName;
private final String storeNameStr;
/*
* Sorted Map of readers keyed by sequence id (Most recent should be last in
* in list).
*/
private final SortedMap<Long, HStoreFile> storefiles =
Collections.synchronizedSortedMap(new TreeMap<Long, HStoreFile>());
/*
* Sorted Map of readers keyed by sequence id (Most recent is last in list).
*/
private final SortedMap<Long, MapFile.Reader> readers =
new TreeMap<Long, MapFile.Reader>();
// The most-recent log-seq-ID that's present. The most-recent such ID means
// we can ignore all log messages up to and including that ID (because they're
// already reflected in the TreeMaps).
private volatile long maxSeqId;
private final Path compactionDir;
private final Integer compactLock = new Integer(0);
private final int compactionThreshold;
// All access must be synchronized.
private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
new CopyOnWriteArraySet<ChangedReadersObserver>();
/**
* An HStore is a set of zero or more MapFiles, which stretch backwards over
* time. A given HStore is responsible for a certain set of columns for a
* row in the HRegion.
*
* <p>The HRegion starts writing to its set of HStores when the HRegion's
* memcache is flushed. This results in a round of new MapFiles, one for
* each HStore.
*
* <p>There's no reason to consider append-logging at this level; all logging
* and locking is handled at the HRegion level. HStore just provides
* services to manage sets of MapFiles. One of the most important of those
* services is MapFile-compaction services.
*
* <p>The only thing having to do with logs that HStore needs to deal with is
* the reconstructionLog. This is a segment of an HRegion's log that might
* NOT be present upon startup. If the param is NULL, there's nothing to do.
* If the param is non-NULL, we need to process the log to reconstruct
* a TreeMap that might not have been written to disk before the process
* died.
*
* <p>It's assumed that after this constructor returns, the reconstructionLog
* file will be deleted (by whoever has instantiated the HStore).
*
* @param basedir qualified path under which the region directory lives
* @param info HRegionInfo for this region
* @param family HColumnDescriptor for this column
* @param fs file system object
* @param reconstructionLog existing log file to apply if any
* @param conf configuration object
* @param reporter Call on a period so hosting server can report we're
* making progress to master -- otherwise master might think region deploy
* failed. Can be null.
* @throws IOException
*/
protected HStore(Path basedir, HRegionInfo info, HColumnDescriptor family,
FileSystem fs, Path reconstructionLog, HBaseConfiguration conf,
final Progressable reporter)
throws IOException {
this.basedir = basedir;
this.info = info;
this.family = family;
this.fs = fs;
this.conf = conf;
// getTimeToLive returns ttl in seconds. Convert to milliseconds.
this.ttl = family.getTimeToLive();
if (ttl != HConstants.FOREVER) {
this.ttl *= 1000;
}
this.memcache = new Memcache(this.ttl, info);
this.compactionDir = HRegion.getCompactionDir(basedir);
this.storeName = Bytes.toBytes(this.info.getEncodedName() + "/" +
Bytes.toString(this.family.getName()));
this.storeNameStr = Bytes.toString(this.storeName);
// By default, we compact if an HStore has more than
// MIN_COMMITS_FOR_COMPACTION map files
this.compactionThreshold =
conf.getInt("hbase.hstore.compactionThreshold", 3);
// By default we split region if a file > DEFAULT_MAX_FILE_SIZE.
long maxFileSize = info.getTableDesc().getMaxFileSize();
if (maxFileSize == HConstants.DEFAULT_MAX_FILE_SIZE) {
maxFileSize = conf.getLong("hbase.hregion.max.filesize",
HConstants.DEFAULT_MAX_FILE_SIZE);
}
this.desiredMaxFileSize = maxFileSize;
this.majorCompactionTime =
conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 86400000);
if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
String strCompactionTime =
family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
this.majorCompactionTime = (new Long(strCompactionTime)).longValue();
}
this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
this.storeSize = 0L;
if (family.getCompression() == HColumnDescriptor.CompressionType.BLOCK) {
this.compression = SequenceFile.CompressionType.BLOCK;
} else if (family.getCompression() ==
HColumnDescriptor.CompressionType.RECORD) {
this.compression = SequenceFile.CompressionType.RECORD;
} else {
this.compression = SequenceFile.CompressionType.NONE;
}
Path mapdir = checkdir(HStoreFile.getMapDir(basedir, info.getEncodedName(),
family.getName()));
Path infodir = checkdir(HStoreFile.getInfoDir(basedir, info.getEncodedName(),
family.getName()));
// Go through the 'mapdir' and 'infodir' together, make sure that all
// MapFiles are in a reliable state. Every entry in 'mapdir' must have a
// corresponding one in 'loginfodir'. Without a corresponding log info
// file, the entry in 'mapdir' must be deleted.
// loadHStoreFiles also computes the max sequence id internally.
this.maxSeqId = -1L;
this.storefiles.putAll(loadHStoreFiles(infodir, mapdir));
if (LOG.isDebugEnabled() && this.storefiles.size() > 0) {
LOG.debug("Loaded " + this.storefiles.size() + " file(s) in hstore " +
Bytes.toString(this.storeName) + ", max sequence id " + this.maxSeqId);
}
// Do reconstruction log.
runReconstructionLog(reconstructionLog, this.maxSeqId, reporter);
// Finally, start up all the map readers!
setupReaders();
}
/*
* Setup the mapfile readers for this store. There could be more than one
* since we haven't compacted yet.
* @throws IOException
*/
private void setupReaders() throws IOException {
boolean first = true;
for(Map.Entry<Long, HStoreFile> e: this.storefiles.entrySet()) {
MapFile.Reader r = null;
if (first) {
// Use a block cache (if configured) for the first reader only
// so as to control memory usage.
r = e.getValue().getReader(this.fs, this.family.isBloomfilter(),
family.isBlockCacheEnabled());
first = false;
} else {
r = e.getValue().getReader(this.fs, this.family.isBloomfilter(),
false);
}
this.readers.put(e.getKey(), r);
}
}
/*
* @param dir If doesn't exist, create it.
* @return Passed <code>dir</code>.
* @throws IOException
*/
private Path checkdir(final Path dir) throws IOException {
if (!fs.exists(dir)) {
fs.mkdirs(dir);
}
return dir;
}
HColumnDescriptor getFamily() {
return this.family;
}
long getMaxSequenceId() {
return this.maxSeqId;
}
/*
* Run reconstuction log
* @param reconstructionLog
* @param msid
* @param reporter
* @throws IOException
*/
private void runReconstructionLog(final Path reconstructionLog,
final long msid, final Progressable reporter)
throws IOException {
try {
doReconstructionLog(reconstructionLog, msid, reporter);
} catch (EOFException e) {
// Presume we got here because of lack of HADOOP-1700; for now keep going
// but this is probably not what we want long term. If we got here there
// has been data-loss
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
" opening " + this.storeName +
" -- continuing. Probably lack-of-HADOOP-1700 causing DATA LOSS!", e);
} catch (IOException e) {
// Presume we got here because of some HDFS issue. Don't just keep going.
// Fail to open the HStore. Probably means we'll fail over and over
// again until human intervention but alternative has us skipping logs
// and losing edits: HBASE-642.
LOG.warn("Exception processing reconstruction log " + reconstructionLog +
" opening " + this.storeName, e);
throw e;
}
}
/*
* Read the reconstructionLog to see whether we need to build a brand-new
* MapFile out of non-flushed log entries.
*
* We can ignore any log message that has a sequence ID that's equal to or
* lower than maxSeqID. (Because we know such log messages are already
* reflected in the MapFiles.)
*/
@SuppressWarnings("unchecked")
private void doReconstructionLog(final Path reconstructionLog,
final long maxSeqID, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
if (reconstructionLog == null || !fs.exists(reconstructionLog)) {
// Nothing to do.
return;
}
// Check its not empty.
FileStatus[] stats = fs.listStatus(reconstructionLog);
if (stats == null || stats.length == 0) {
LOG.warn("Passed reconstruction log " + reconstructionLog + " is zero-length");
return;
}
long maxSeqIdInLog = -1;
TreeMap<HStoreKey, byte []> reconstructedCache =
new TreeMap<HStoreKey, byte []>(new HStoreKey.HStoreKeyWritableComparator(this.info));
SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
reconstructionLog, this.conf);
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
long skippedEdits = 0;
long editsCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while (logReader.next(key, val)) {
maxSeqIdInLog = Math.max(maxSeqIdInLog, key.getLogSeqNum());
if (key.getLogSeqNum() <= maxSeqID) {
skippedEdits++;
continue;
}
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
byte [] column = val.getColumn();
if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
|| !Bytes.equals(key.getRegionName(), info.getRegionName())
|| !HStoreKey.matchingFamily(family.getName(), column)) {
continue;
}
HStoreKey k = new HStoreKey(key.getRow(), column, val.getTimestamp(),
this.info);
reconstructedCache.put(k, val.getVal());
editsCount++;
// Every 2k edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % reportInterval) == 0) {
reporter.progress();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
" because sequence id <= " + maxSeqID);
}
} finally {
logReader.close();
}
if (reconstructedCache.size() > 0) {
// We create a "virtual flush" at maxSeqIdInLog+1.
if (LOG.isDebugEnabled()) {
LOG.debug("flushing reconstructionCache");
}
internalFlushCache(reconstructedCache, maxSeqIdInLog + 1);
}
}
/*
* Creates a series of HStoreFiles loaded from the given directory.
* There must be a matching 'mapdir' and 'loginfo' pair of files.
* If only one exists, we'll delete it. Does other consistency tests
* checking files are not zero, etc.
*
* @param infodir qualified path for info file directory
* @param mapdir qualified path for map file directory
* @throws IOException
*/
private SortedMap<Long, HStoreFile> loadHStoreFiles(Path infodir, Path mapdir)
throws IOException {
// Look first at info files. If a reference, these contain info we need
// to create the HStoreFile.
FileStatus infofiles[] = fs.listStatus(infodir);
SortedMap<Long, HStoreFile> results = new TreeMap<Long, HStoreFile>();
ArrayList<Path> mapfiles = new ArrayList<Path>(infofiles.length);
for (int i = 0; i < infofiles.length; i++) {
Path p = infofiles[i].getPath();
// Check for empty info file. Should never be the case but can happen
// after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646
if (this.fs.getFileStatus(p).getLen() <= 0) {
LOG.warn("Skipping " + p + " because its empty. DATA LOSS? Can " +
"this scenario be repaired? HBASE-646");
continue;
}
Matcher m = REF_NAME_PARSER.matcher(p.getName());
/*
* * * * * N O T E * * * * *
*
* We call isReference(Path, Matcher) here because it calls
* Matcher.matches() which must be called before Matcher.group(int)
* and we don't want to call Matcher.matches() twice.
*
* * * * * N O T E * * * * *
*/
boolean isReference = isReference(p, m);
long fid = Long.parseLong(m.group(1));
HStoreFile curfile = null;
Reference reference = null;
if (isReference) {
reference = HStoreFile.readSplitInfo(p, fs);
}
curfile = new HStoreFile(conf, fs, basedir, this.info,
family.getName(), fid, reference);
long storeSeqId = -1;
try {
storeSeqId = curfile.loadInfo(fs);
if (storeSeqId > this.maxSeqId) {
this.maxSeqId = storeSeqId;
}
} catch (IOException e) {
// If the HSTORE_LOGINFOFILE doesn't contain a number, just ignore it.
// That means it was built prior to the previous run of HStore, and so
// it cannot contain any updates also contained in the log.
LOG.info("HSTORE_LOGINFOFILE " + curfile +
" does not contain a sequence number - ignoring");
}
Path mapfile = curfile.getMapFilePath();
if (!fs.exists(mapfile)) {
fs.delete(curfile.getInfoFilePath(), false);
LOG.warn("Mapfile " + mapfile.toString() + " does not exist. " +
"Cleaned up info file. Continuing...Probable DATA LOSS!!!");
continue;
}
// References don't have data or index components under mapfile.
if (!isReference && isEmptyDataFile(mapfile)) {
curfile.delete();
// We can have empty data file if data loss in hdfs.
LOG.warn("Mapfile " + mapfile.toString() + " has empty data. " +
"Deleting. Continuing...Probable DATA LOSS!!! See HBASE-646.");
continue;
}
if (!isReference && isEmptyIndexFile(mapfile)) {
try {
// Try fixing this file.. if we can. Use the hbase version of fix.
// Need to remove the old index file first else fix won't go ahead.
this.fs.delete(new Path(mapfile, MapFile.INDEX_FILE_NAME), false);
// TODO: This is going to fail if we are to rebuild a file from
// meta because it won't have right comparator: HBASE-848.
long count = MapFile.fix(this.fs, mapfile, HStoreKey.class,
HBaseMapFile.VALUE_CLASS, false, this.conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Fixed index on " + mapfile.toString() + "; had " +
count + " entries");
}
} catch (Exception e) {
LOG.warn("Failed fix of " + mapfile.toString() +
"...continuing; Probable DATA LOSS!!!", e);
continue;
}
}
long length = curfile.length();
storeSize += length;
// TODO: Confirm referent exists.
// Found map and sympathetic info file. Add this hstorefile to result.
if (LOG.isDebugEnabled()) {
LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
isReference + ", sequence id=" + storeSeqId +
", length=" + length + ", majorCompaction=" +
curfile.isMajorCompaction());
}
results.put(Long.valueOf(storeSeqId), curfile);
// Keep list of sympathetic data mapfiles for cleaning info dir in next
// section. Make sure path is fully qualified for compare.
mapfiles.add(this.fs.makeQualified(mapfile));
}
cleanDataFiles(mapfiles, mapdir);
return results;
}
/*
* If no info file delete the sympathetic data file.
* @param mapfiles List of mapfiles.
* @param mapdir Directory to check.
* @throws IOException
*/
private void cleanDataFiles(final List<Path> mapfiles, final Path mapdir)
throws IOException {
// List paths by experience returns fully qualified names -- at least when
// running on a mini hdfs cluster.
FileStatus [] datfiles = fs.listStatus(mapdir);
for (int i = 0; i < datfiles.length; i++) {
Path p = datfiles[i].getPath();
// If does not have sympathetic info file, delete.
Path qualifiedP = fs.makeQualified(p);
if (!mapfiles.contains(qualifiedP)) {
fs.delete(p, true);
}
}
}
/*
* @param mapfile
* @return True if the passed mapfile has a zero-length data component (its
* broken).
* @throws IOException
*/
private boolean isEmptyDataFile(final Path mapfile)
throws IOException {
// Mapfiles are made of 'data' and 'index' files. Confirm 'data' is
// non-null if it exists (may not have been written to yet).
return isEmptyFile(new Path(mapfile, MapFile.DATA_FILE_NAME));
}
/*
* @param mapfile
* @return True if the passed mapfile has a zero-length index component (its
* broken).
* @throws IOException
*/
private boolean isEmptyIndexFile(final Path mapfile)
throws IOException {
// Mapfiles are made of 'data' and 'index' files. Confirm 'data' is
// non-null if it exists (may not have been written to yet).
return isEmptyFile(new Path(mapfile, MapFile.INDEX_FILE_NAME));
}
/*
* @param f
* @return True if the passed file does not exist or is zero-length (its
* broken).
* @throws IOException
*/
private boolean isEmptyFile(final Path f)
throws IOException {
return !this.fs.exists(f) || this.fs.getFileStatus(f).getLen() == 0;
}
/**
* Adds a value to the memcache
*
* @param key
* @param value
* @return memcache size delta
*/
protected long add(HStoreKey key, byte[] value) {
lock.readLock().lock();
try {
return this.memcache.add(key, value);
} finally {
lock.readLock().unlock();
}
}
/**
* Close all the MapFile readers
*
* We don't need to worry about subsequent requests because the HRegion holds
* a write lock that will prevent any more reads or writes.
*
* @throws IOException
*/
List<HStoreFile> close() throws IOException {
ArrayList<HStoreFile> result = null;
this.lock.writeLock().lock();
try {
for (MapFile.Reader reader: this.readers.values()) {
reader.close();
}
synchronized (this.storefiles) {
result = new ArrayList<HStoreFile>(storefiles.values());
}
LOG.debug("closed " + this.storeNameStr);
return result;
} finally {
this.lock.writeLock().unlock();
}
}
//////////////////////////////////////////////////////////////////////////////
// Flush changes to disk
//////////////////////////////////////////////////////////////////////////////
/**
* Snapshot this stores memcache. Call before running
* {@link #flushCache(long)} so it has some work to do.
*/
void snapshot() {
this.memcache.snapshot();
}
/**
* Write out current snapshot. Presumes {@link #snapshot()} has been called
* previously.
* @param logCacheFlushId flush sequence number
* @return true if a compaction is needed
* @throws IOException
*/
boolean flushCache(final long logCacheFlushId) throws IOException {
// Get the snapshot to flush. Presumes that a call to
// this.memcache.snapshot() has happened earlier up in the chain.
SortedMap<HStoreKey, byte []> cache = this.memcache.getSnapshot();
boolean compactionNeeded = internalFlushCache(cache, logCacheFlushId);
// If an exception happens flushing, we let it out without clearing
// the memcache snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
this.memcache.clearSnapshot(cache);
return compactionNeeded;
}
private boolean internalFlushCache(final SortedMap<HStoreKey, byte []> cache,
final long logCacheFlushId)
throws IOException {
long flushed = 0;
// Don't flush if there are no entries.
if (cache.size() == 0) {
return false;
}
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized(flushLock) {
long now = System.currentTimeMillis();
// A. Write the Maps out to the disk
HStoreFile flushedFile = new HStoreFile(conf, fs, basedir,
this.info, family.getName(), -1L, null);
MapFile.Writer out = flushedFile.getWriter(this.fs, this.compression,
this.family.isBloomfilter(), cache.size());
setIndexInterval(out);
// Here we tried picking up an existing HStoreFile from disk and
// interlacing the memcache flush compacting as we go. The notion was
// that interlacing would take as long as a pure flush with the added
// benefit of having one less file in the store. Experiments showed that
// it takes two to three times the amount of time flushing -- more column
// families makes it so the two timings come closer together -- but it
// also complicates the flush. The code was removed. Needed work picking
// which file to interlace (favor references first, etc.)
//
// Related, looks like 'merging compactions' in BigTable paper interlaces
// a memcache flush. We don't.
int entries = 0;
try {
for (Map.Entry<HStoreKey, byte []> es: cache.entrySet()) {
HStoreKey curkey = es.getKey();
byte[] bytes = es.getValue();
if (HStoreKey.matchingFamily(this.family.getName(), curkey.getColumn())) {
if (!isExpired(curkey, ttl, now)) {
entries++;
out.append(curkey, new ImmutableBytesWritable(bytes));
flushed += this.memcache.heapSize(curkey, bytes, null);
}
}
}
} finally {
out.close();
}
long newStoreSize = flushedFile.length();
storeSize += newStoreSize;
// B. Write out the log sequence number that corresponds to this output
// MapFile. The MapFile is current up to and including the log seq num.
flushedFile.writeInfo(fs, logCacheFlushId);
// C. Finally, make the new MapFile available.
updateReaders(logCacheFlushId, flushedFile);
if(LOG.isDebugEnabled()) {
LOG.debug("Added " + FSUtils.getPath(flushedFile.getMapFilePath()) +
" with " + entries +
" entries, sequence id " + logCacheFlushId + ", data size ~" +
StringUtils.humanReadableInt(flushed) + ", file size " +
StringUtils.humanReadableInt(newStoreSize) + " to " +
this.info.getRegionNameAsString());
}
}
return storefiles.size() >= compactionThreshold;
}
/*
* Change readers adding into place the Reader produced by this new flush.
* @param logCacheFlushId
* @param flushedFile
* @throws IOException
*/
private void updateReaders(final long logCacheFlushId,
final HStoreFile flushedFile)
throws IOException {
this.lock.writeLock().lock();
try {
Long flushid = Long.valueOf(logCacheFlushId);
// Open the map file reader.
this.readers.put(flushid,
flushedFile.getReader(this.fs, this.family.isBloomfilter(),
this.family.isBlockCacheEnabled()));
this.storefiles.put(flushid, flushedFile);
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
} finally {
this.lock.writeLock().unlock();
}
}
/*
* Notify all observers that set of Readers has changed.
* @throws IOException
*/
private void notifyChangedReadersObservers() throws IOException {
for (ChangedReadersObserver o: this.changedReaderObservers) {
o.updateReaders();
}
}
/*
* @param o Observer who wants to know about changes in set of Readers
*/
void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o);
}
/*
* @param o Observer no longer interested in changes in set of Readers.
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
if (!this.changedReaderObservers.remove(o)) {
LOG.warn("Not in set" + o);
}
}
//////////////////////////////////////////////////////////////////////////////
// Compaction
//////////////////////////////////////////////////////////////////////////////
/*
* @param files
* @return True if any of the files in <code>files</code> are References.
*/
private boolean hasReferences(Collection<HStoreFile> files) {
if (files != null && files.size() > 0) {
for (HStoreFile hsf: files) {
if (hsf.isReference()) {
return true;
}
}
}
return false;
}
/*
* Gets lowest timestamp from files in a dir
*
* @param fs
* @param dir
* @throws IOException
*/
private static long getLowestTimestamp(FileSystem fs, Path dir)
throws IOException {
FileStatus[] stats = fs.listStatus(dir);
if (stats == null || stats.length == 0) {
return 0l;
}
long lowTimestamp = Long.MAX_VALUE;
for (int i = 0; i < stats.length; i++) {
long timestamp = stats[i].getModificationTime();
if (timestamp < lowTimestamp){
lowTimestamp = timestamp;
}
}
return lowTimestamp;
}
/**
* Compact the back-HStores. This method may take some time, so the calling
* thread must be able to block for long periods.
*
* <p>During this time, the HStore can work as usual, getting values from
* MapFiles and writing new MapFiles from the Memcache.
*
* Existing MapFiles are not destroyed until the new compacted TreeMap is
* completely written-out to disk.
*
* The compactLock prevents multiple simultaneous compactions.
* The structureLock prevents us from interfering with other write operations.
*
* We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
* @param majorCompaction True to force a major compaction regardless of
* thresholds
* @return mid key if a split is needed, null otherwise
* @throws IOException
*/
StoreSize compact(final boolean majorCompaction) throws IOException {
boolean forceSplit = this.info.shouldSplit(false);
boolean doMajorCompaction = majorCompaction;
synchronized (compactLock) {
long maxId = -1;
List<HStoreFile> filesToCompact = null;
synchronized (storefiles) {
if (this.storefiles.size() <= 0) {
LOG.debug(this.storeNameStr + ": no store files to compact");
return null;
}
// filesToCompact are sorted oldest to newest.
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
// The max-sequenceID in any of the to-be-compacted TreeMaps is the
// last key of storefiles.
maxId = this.storefiles.lastKey().longValue();
}
// Check to see if we need to do a major compaction on this region.
// If so, change doMajorCompaction to true to skip the incremental
// compacting below. Only check if doMajorCompaction is not true.
if (!doMajorCompaction) {
doMajorCompaction = isMajorCompaction(filesToCompact);
}
boolean references = hasReferences(filesToCompact);
if (!doMajorCompaction && !references &&
(forceSplit || (filesToCompact.size() < compactionThreshold))) {
return checkSplit(forceSplit);
}
if (!fs.exists(compactionDir) && !fs.mkdirs(compactionDir)) {
LOG.warn("Mkdir on " + compactionDir.toString() + " failed");
return checkSplit(forceSplit);
}
// HBASE-745, preparing all store file sizes for incremental compacting
// selection.
int countOfFiles = filesToCompact.size();
long totalSize = 0;
long[] fileSizes = new long[countOfFiles];
long skipped = 0;
int point = 0;
for (int i = 0; i < countOfFiles; i++) {
HStoreFile file = filesToCompact.get(i);
Path path = file.getMapFilePath();
if (path == null) {
LOG.warn("Path is null for " + file);
return null;
}
int len = 0;
// listStatus can come back null.
FileStatus [] fss = this.fs.listStatus(path);
for (int ii = 0; fss != null && ii < fss.length; ii++) {
len += fss[ii].getLen();
}
fileSizes[i] = len;
totalSize += len;
}
if (!doMajorCompaction && !references) {
// Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the
// size of the second, skip the largest, and continue to next...,
// until we meet the compactionThreshold limit.
for (point = 0; point < countOfFiles - 1; point++) {
if ((fileSizes[point] < fileSizes[point + 1] * 2) &&
(countOfFiles - point) <= maxFilesToCompact) {
break;
}
skipped += fileSizes[point];
}
filesToCompact = new ArrayList<HStoreFile>(filesToCompact.subList(point,
countOfFiles));
if (filesToCompact.size() <= 1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipped compaction of 1 file; compaction size of " +
this.storeNameStr + ": " +
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
" files, size: " + skipped);
}
return checkSplit(forceSplit);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Compaction size of " + this.storeNameStr + ": " +
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
" file(s), size: " + skipped);
}
}
/*
* We create a new list of MapFile.Reader objects so we don't screw up
* the caching associated with the currently-loaded ones. Our iteration-
* based access pattern is practically designed to ruin the cache.
*/
List<MapFile.Reader> rdrs = new ArrayList<MapFile.Reader>();
int nrows = createReaders(rdrs, filesToCompact);
// Step through them, writing to the brand-new MapFile
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
this.compactionDir, this.info, family.getName(), -1L, null);
if (LOG.isDebugEnabled()) {
LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
(references? ", hasReferences=true,": " ") + " into " +
FSUtils.getPath(compactedOutputFile.getMapFilePath()));
}
MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
this.compression, this.family.isBloomfilter(), nrows);
setIndexInterval(writer);
try {
compact(writer, rdrs, doMajorCompaction);
} finally {
writer.close();
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
compactedOutputFile.writeInfo(fs, maxId, doMajorCompaction);
// Move the compaction into place.
completeCompaction(filesToCompact, compactedOutputFile);
if (LOG.isDebugEnabled()) {
LOG.debug("Completed " + (doMajorCompaction? "major": "") +
" compaction of " + this.storeNameStr +
" store size is " + StringUtils.humanReadableInt(storeSize));
}
}
return checkSplit(forceSplit);
}
/*
* Set the index interval for the mapfile. There are two sources for
* configuration information: the HCD, and the global HBase config.
* If a source returns the default value, it is ignored. Otherwise,
* the smallest non-default value is preferred.
*/
private void setIndexInterval(MapFile.Writer writer) {
int familyInterval = this.family.getMapFileIndexInterval();
int interval = this.conf.getInt("hbase.io.index.interval",
HColumnDescriptor.DEFAULT_MAPFILE_INDEX_INTERVAL);
if (familyInterval != HColumnDescriptor.DEFAULT_MAPFILE_INDEX_INTERVAL) {
if (interval != HColumnDescriptor.DEFAULT_MAPFILE_INDEX_INTERVAL) {
if (familyInterval < interval)
interval = familyInterval;
} else {
interval = familyInterval;
}
}
writer.setIndexInterval(interval);
}
/*
* @return True if we should run a major compaction.
*/
boolean isMajorCompaction() throws IOException {
List<HStoreFile> filesToCompact = null;
synchronized (storefiles) {
// filesToCompact are sorted oldest to newest.
filesToCompact = new ArrayList<HStoreFile>(this.storefiles.values());
}
return isMajorCompaction(filesToCompact);
}
/*
* @param filesToCompact Files to compact. Can be null.
* @return True if we should run a major compaction.
*/
private boolean isMajorCompaction(final List<HStoreFile> filesToCompact)
throws IOException {
boolean result = false;
Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
this.family.getName());
long lowTimestamp = getLowestTimestamp(fs, mapdir);
long now = System.currentTimeMillis();
if (lowTimestamp > 0l && lowTimestamp < (now - this.majorCompactionTime)) {
// Major compaction time has elapsed.
long elapsedTime = now - lowTimestamp;
if (filesToCompact != null && filesToCompact.size() == 1 &&
filesToCompact.get(0).isMajorCompaction() &&
(this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping major compaction of " + this.storeNameStr +
" because one (major) compacted file only and elapsedTime " +
elapsedTime + "ms is < ttl=" + this.ttl);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Major compaction triggered on store " + this.storeNameStr +
"; time since last major compaction " + (now - lowTimestamp) + "ms");
}
result = true;
}
}
return result;
}
/*
* Create readers for the passed in list of HStoreFiles and add them to
* <code>readers</code> list. Used compacting.
* @param readers Add Readers here.
* @param files List of HSFs to make Readers for.
* @return Count of rows for bloom filter sizing. Returns -1 if no bloom
* filter wanted.
*/
private int createReaders(final List<MapFile.Reader> rs,
final List<HStoreFile> files)
throws IOException {
/* We create a new list of MapFile.Reader objects so we don't screw up
* the caching associated with the currently-loaded ones. Our iteration-
* based access pattern is practically designed to ruin the cache.
*/
int nrows = -1;
for (HStoreFile file: files) {
try {
// TODO: Readers are opened without block-cache enabled. Means we don't
// get the prefetch that makes the read faster. But we don't want to
// enable block-cache for these readers that are about to be closed.
// The compaction of soon-to-be closed readers will probably force out
// blocks that may be needed servicing real-time requests whereas
// compaction runs in background. TODO: We know we're going to read
// this file straight through. Leverage this fact. Use a big buffer
// client side to speed things up or read it all up into memory one file
// at a time or pull local and memory-map the file but leave the writer
// up in hdfs?
BloomFilterMapFile.Reader reader = file.getReader(fs, false, false);
rs.add(reader);
// Compute the size of the new bloomfilter if needed
if (this.family.isBloomfilter()) {
nrows += reader.getBloomFilterSize();
}
} catch (IOException e) {
// Add info about which file threw exception. It may not be in the
// exception message so output a message here where we know the
// culprit.
LOG.warn("Failed with " + e.toString() + ": " + file.toString());
closeCompactionReaders(rs);
throw e;
}
}
return nrows;
}
/*
* @param r List to reverse
* @return A reversed array of content of <code>readers</code>
*/
private MapFile.Reader [] reverse(final List<MapFile.Reader> r) {
List<MapFile.Reader> copy = new ArrayList<MapFile.Reader>(r);
Collections.reverse(copy);
return copy.toArray(new MapFile.Reader[0]);
}
/*
* @param rdrs List of readers
* @param keys Current keys
* @param done Which readers are done
* @return The lowest current key in passed <code>rdrs</code>
*/
private int getLowestKey(final MapFile.Reader [] rdrs,
final HStoreKey [] keys, final boolean [] done) {
int lowestKey = -1;
for (int i = 0; i < rdrs.length; i++) {
if (done[i]) {
continue;
}
if (lowestKey < 0) {
lowestKey = i;
} else {
if (keys[i].compareTo(keys[lowestKey]) < 0) {
lowestKey = i;
}
}
}
return lowestKey;
}
/*
* Compact a list of MapFile.Readers into MapFile.Writer.
*
* We work by iterating through the readers in parallel looking at newest
* store file first. We always increment the lowest-ranked one. Updates to a
* single row/column will appear ranked by timestamp.
* @param compactedOut Where to write compaction.
* @param pReaders List of readers sorted oldest to newest.
* @param majorCompaction True to force a major compaction regardless of
* thresholds
* @throws IOException
*/
private void compact(final MapFile.Writer compactedOut,
final List<MapFile.Reader> pReaders, final boolean majorCompaction)
throws IOException {
// Reverse order so newest store file is first.
MapFile.Reader[] rdrs = reverse(pReaders);
try {
HStoreKey [] keys = new HStoreKey[rdrs.length];
ImmutableBytesWritable [] vals = new ImmutableBytesWritable[rdrs.length];
boolean [] done = new boolean[rdrs.length];
for(int i = 0; i < rdrs.length; i++) {
keys[i] = new HStoreKey(HConstants.EMPTY_BYTE_ARRAY, this.info);
vals[i] = new ImmutableBytesWritable();
done[i] = false;
}
// Now, advance through the readers in order. This will have the
// effect of a run-time sort of the entire dataset.
int numDone = 0;
for (int i = 0; i < rdrs.length; i++) {
rdrs[i].reset();
done[i] = !rdrs[i].next(keys[i], vals[i]);
if (done[i]) {
numDone++;
}
}
long now = System.currentTimeMillis();
int timesSeen = 0;
HStoreKey lastSeen = new HStoreKey();
HStoreKey lastDelete = null;
while (numDone < done.length) {
// Get lowest key in all store files.
int lowestKey = getLowestKey(rdrs, keys, done);
HStoreKey sk = keys[lowestKey];
// If its same row and column as last key, increment times seen.
if (HStoreKey.equalsTwoRowKeys(info, lastSeen.getRow(), sk.getRow())
&& Bytes.equals(lastSeen.getColumn(), sk.getColumn())) {
timesSeen++;
// Reset last delete if not exact timestamp -- lastDelete only stops
// exactly the same key making it out to the compacted store file.
if (lastDelete != null &&
lastDelete.getTimestamp() != sk.getTimestamp()) {
lastDelete = null;
}
} else {
timesSeen = 1;
lastDelete = null;
}
// Don't write empty rows or columns. Only remove cells on major
// compaction. Remove if expired of > VERSIONS
if (sk.getRow().length != 0 && sk.getColumn().length != 0) {
ImmutableBytesWritable value = vals[lowestKey];
if (!majorCompaction) {
// Write out all values if not a major compaction.
compactedOut.append(sk, value);
} else {
boolean expired = false;
boolean deleted = false;
if (timesSeen <= family.getMaxVersions() &&
!(expired = isExpired(sk, ttl, now))) {
// If this value key is same as a deleted key, skip
if (lastDelete != null && sk.equals(lastDelete)) {
deleted = true;
} else if (HLogEdit.isDeleted(value.get())) {
// If a deleted value, skip
deleted = true;
lastDelete = new HStoreKey(sk);
} else {
compactedOut.append(sk, vals[lowestKey]);
}
}
if (expired || deleted) {
// HBASE-855 remove one from timesSeen because it did not make it
// past expired check -- don't count against max versions.
timesSeen--;
}
}
}
// Update last-seen items
lastSeen = new HStoreKey(sk);
// Advance the smallest key. If that reader's all finished, then
// mark it as done.
if (!rdrs[lowestKey].next(keys[lowestKey], vals[lowestKey])) {
done[lowestKey] = true;
rdrs[lowestKey].close();
rdrs[lowestKey] = null;
numDone++;
}
}
} finally {
closeCompactionReaders(Arrays.asList(rdrs));
}
}
private void closeCompactionReaders(final List<MapFile.Reader> rdrs) {
for (MapFile.Reader r: rdrs) {
try {
if (r != null) {
r.close();
}
} catch (IOException e) {
LOG.warn("Exception closing reader for " + this.storeNameStr, e);
}
}
}
/*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
*
* It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
* invoked at HStore startup, if the prior execution died midway through.
*
* <p>Moving the compacted TreeMap into place means:
* <pre>
* 1) Moving the new compacted MapFile into place
* 2) Unload all replaced MapFiles, close and collect list to delete.
* 3) Loading the new TreeMap.
* 4) Compute new store size
* </pre>
*
* @param compactedFiles list of files that were compacted
* @param compactedFile HStoreFile that is the result of the compaction
* @throws IOException
*/
private void completeCompaction(final List<HStoreFile> compactedFiles,
final HStoreFile compactedFile)
throws IOException {
this.lock.writeLock().lock();
try {
// 1. Moving the new MapFile into place.
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
this.info, family.getName(), -1, null,
compactedFile.isMajorCompaction());
if (LOG.isDebugEnabled()) {
LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
}
if (!compactedFile.rename(this.fs, finalCompactedFile)) {
LOG.error("Failed move of compacted file " +
finalCompactedFile.getMapFilePath().toString());
return;
}
// 2. Unload all replaced MapFiles, close and collect list to delete.
synchronized (storefiles) {
Map<Long, HStoreFile> toDelete = new HashMap<Long, HStoreFile>();
for (Map.Entry<Long, HStoreFile> e : this.storefiles.entrySet()) {
if (!compactedFiles.contains(e.getValue())) {
continue;
}
Long key = e.getKey();
MapFile.Reader reader = this.readers.remove(key);
if (reader != null) {
reader.close();
}
toDelete.put(key, e.getValue());
}
try {
// 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
for (Long key : toDelete.keySet()) {
this.storefiles.remove(key);
}
// Add new compacted Reader and store file.
Long orderVal = Long.valueOf(finalCompactedFile.loadInfo(fs));
this.readers.put(orderVal,
// Use a block cache (if configured) for this reader since
// it is the only one.
finalCompactedFile.getReader(this.fs,
this.family.isBloomfilter(),
this.family.isBlockCacheEnabled()));
this.storefiles.put(orderVal, finalCompactedFile);
// Tell observers that list of Readers has changed.
notifyChangedReadersObservers();
// Finally, delete old store files.
for (HStoreFile hsf : toDelete.values()) {
hsf.delete();
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files for " +
this.storeNameStr +
". Compacted file is " + finalCompactedFile.toString() +
". Files replaced are " + compactedFiles.toString() +
" some of which may have been already removed", e);
}
// 4. Compute new store size
storeSize = 0L;
for (HStoreFile hsf : storefiles.values()) {
storeSize += hsf.length();
}
}
} finally {
this.lock.writeLock().unlock();
}
}
// ////////////////////////////////////////////////////////////////////////////
// Accessors.
// (This is the only section that is directly useful!)
//////////////////////////////////////////////////////////////////////////////
/**
* Return all the available columns for the given key. The key indicates a
* row and timestamp, but not a column name.
*
* The returned object should map column names to Cells.
*/
void getFull(HStoreKey key, final Set<byte []> columns,
final int numVersions, Map<byte [], Cell> results)
throws IOException {
int versions = versionsToReturn(numVersions);
Map<byte [], Long> deletes =
new TreeMap<byte [], Long>(Bytes.BYTES_COMPARATOR);
// if the key is null, we're not even looking for anything. return.
if (key == null) {
return;
}
this.lock.readLock().lock();
// get from the memcache first.
memcache.getFull(key, columns, versions, deletes, results);
try {
MapFile.Reader[] maparray = getReaders();
// examine each mapfile
for (int i = maparray.length - 1; i >= 0; i--) {
MapFile.Reader map = maparray[i];
// synchronize on the map so that no one else iterates it at the same
// time
getFullFromMapFile(map, key, columns, versions, deletes, results);
}
} finally {
this.lock.readLock().unlock();
}
}
private void getFullFromMapFile(MapFile.Reader map, HStoreKey key,
Set<byte []> columns, int numVersions, Map<byte [], Long> deletes,
Map<byte [], Cell> results)
throws IOException {
synchronized(map) {
long now = System.currentTimeMillis();
// seek back to the beginning
map.reset();
// seek to the closest key that should match the row we're looking for
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(key, readval);
if (readkey == null) {
return;
}
do {
byte [] readcol = readkey.getColumn();
// if we're looking for this column (or all of them), and there isn't
// already a value for this column in the results map or there is a value
// but we haven't collected enough versions yet, and the key we
// just read matches, then we'll consider it
if ((columns == null || columns.contains(readcol))
&& (!results.containsKey(readcol)
|| results.get(readcol).getNumValues() < numVersions)
&& key.matchesWithoutColumn(readkey)) {
// if the value of the cell we're looking at right now is a delete,
// we need to treat it differently
if(HLogEdit.isDeleted(readval.get())) {
// if it's not already recorded as a delete or recorded with a more
// recent delete timestamp, record it for later
if (!deletes.containsKey(readcol)
|| deletes.get(readcol).longValue() < readkey.getTimestamp()) {
deletes.put(readcol, Long.valueOf(readkey.getTimestamp()));
}
} else if (!(deletes.containsKey(readcol)
&& deletes.get(readcol).longValue() >= readkey.getTimestamp()) ) {
// So the cell itself isn't a delete, but there may be a delete
// pending from earlier in our search. Only record this result if
// there aren't any pending deletes.
if (!(deletes.containsKey(readcol) &&
deletes.get(readcol).longValue() >= readkey.getTimestamp())) {
if (!isExpired(readkey, ttl, now)) {
if (!results.containsKey(readcol)) {
results.put(readcol,
new Cell(readval.get(), readkey.getTimestamp()));
} else {
results.get(readcol).add(readval.get(),
readkey.getTimestamp());
}
// need to reinstantiate the readval so we can reuse it,
// otherwise next iteration will destroy our result
readval = new ImmutableBytesWritable();
}
}
}
} else if (HStoreKey.compareTwoRowKeys(info,key.getRow(), readkey.getRow()) < 0) {
// if we've crossed into the next row, then we can just stop
// iterating
break;
}
} while(map.next(readkey, readval));
}
}
/**
* @return Array of readers ordered oldest to newest.
*/
public MapFile.Reader [] getReaders() {
return this.readers.values().
toArray(new MapFile.Reader[this.readers.size()]);
}
/*
* @param wantedVersions How many versions were asked for.
* @return wantedVersions or this families' MAX_VERSIONS.
*/
private int versionsToReturn(final int wantedVersions) {
if (wantedVersions <= 0) {
throw new IllegalArgumentException("Number of versions must be > 0");
}
// Make sure we do not return more than maximum versions for this store.
return wantedVersions > this.family.getMaxVersions()?
this.family.getMaxVersions(): wantedVersions;
}
/**
* Get the value for the indicated HStoreKey. Grab the target value and the
* previous <code>numVersions - 1</code> values, as well.
*
* Use {@link HConstants.ALL_VERSIONS} to retrieve all versions.
* @param key
* @param numVersions Number of versions to fetch. Must be > 0.
* @return values for the specified versions
* @throws IOException
*/
Cell[] get(final HStoreKey key, final int numVersions) throws IOException {
// This code below is very close to the body of the getKeys method. Any
// changes in the flow below should also probably be done in getKeys.
// TODO: Refactor so same code used.
long now = System.currentTimeMillis();
int versions = versionsToReturn(numVersions);
// Keep a list of deleted cell keys. We need this because as we go through
// the memcache and store files, the cell with the delete marker may be
// in one store and the old non-delete cell value in a later store.
// If we don't keep around the fact that the cell was deleted in a newer
// record, we end up returning the old value if user is asking for more
// than one version. This List of deletes should not be large since we
// are only keeping rows and columns that match those set on the get and
// which have delete values. If memory usage becomes an issue, could
// redo as bloom filter.
Set<HStoreKey> deletes = new HashSet<HStoreKey>();
this.lock.readLock().lock();
try {
// Check the memcache
List<Cell> results = this.memcache.get(key, versions, deletes, now);
// If we got sufficient versions from memcache, return.
if (results.size() == versions) {
return results.toArray(new Cell[results.size()]);
}
MapFile.Reader[] maparray = getReaders();
// Returned array is sorted with the most recent addition last.
for(int i = maparray.length - 1;
i >= 0 && !hasEnoughVersions(versions, results); i--) {
MapFile.Reader r = maparray[i];
synchronized (r) {
// Do the priming read
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)r.getClosest(key, readval);
if (readkey == null) {
// map.getClosest returns null if the passed key is > than the
// last key in the map file. getClosest is a bit of a misnomer
// since it returns exact match or the next closest key AFTER not
// BEFORE. We use getClosest because we're usually passed a
// key that has a timestamp of maximum long to indicate we want
// most recent update.
continue;
}
if (!readkey.matchesRowCol(key)) {
continue;
}
if (get(readkey, readval.get(), versions, results, deletes, now)) {
break;
}
for (readval = new ImmutableBytesWritable();
r.next(readkey, readval) && readkey.matchesRowCol(key);
readval = new ImmutableBytesWritable()) {
if (get(readkey, readval.get(), versions, results, deletes, now)) {
break;
}
}
}
}
return results.size() == 0 ?
null : results.toArray(new Cell[results.size()]);
} finally {
this.lock.readLock().unlock();
}
}
/*
* Look at one key/value.
* @param key
* @param value
* @param versions
* @param results
* @param deletes
* @param now
* @return True if we have enough versions.
*/
private boolean get(final HStoreKey key, final byte [] value,
final int versions, final List<Cell> results,
final Set<HStoreKey> deletes, final long now) {
if (!HLogEdit.isDeleted(value)) {
if (notExpiredAndNotInDeletes(this.ttl, key, now, deletes)) {
results.add(new Cell(value, key.getTimestamp()));
}
// Perhaps only one version is wanted. I could let this
// test happen later in the for loop test but it would cost
// the allocation of an ImmutableBytesWritable.
if (hasEnoughVersions(versions, results)) {
return true;
}
} else {
// Is this copy necessary?
deletes.add(new HStoreKey(key));
}
return false;
}
/*
* Small method to check if we are over the max number of versions
* or we acheived this family max versions.
* The later happens when we have the situation described in HBASE-621.
* @param versions
* @param c
* @return
*/
private boolean hasEnoughVersions(final int versions, final List<Cell> c) {
return c.size() >= versions;
}
/**
* Get <code>versions</code> of keys matching the origin key's
* row/column/timestamp and those of an older vintage.
* @param origin Where to start searching.
* @param versions How many versions to return. Pass
* {@link HConstants#ALL_VERSIONS} to retrieve all.
* @param now
* @param columnPattern regex pattern for column matching. if columnPattern
* is not null, we use column pattern to match columns. And the columnPattern
* only works when origin's column is null or its length is zero.
* @return Matching keys.
* @throws IOException
*/
public List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
final long now, final Pattern columnPattern)
throws IOException {
// This code below is very close to the body of the get method. Any
// changes in the flow below should also probably be done in get. TODO:
// Refactor so same code used.
Set<HStoreKey> deletes = new HashSet<HStoreKey>();
this.lock.readLock().lock();
try {
// Check the memcache
List<HStoreKey> keys =
this.memcache.getKeys(origin, versions, deletes, now, columnPattern);
// If we got sufficient versions from memcache, return.
if (keys.size() >= versions) {
return keys;
}
MapFile.Reader[] maparray = getReaders();
// Returned array is sorted with the most recent addition last.
for (int i = maparray.length - 1;
i >= 0 && keys.size() < versions; i--) {
MapFile.Reader map = maparray[i];
synchronized(map) {
map.reset();
// Do the priming read
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey readkey = (HStoreKey)map.getClosest(origin, readval);
if (readkey == null) {
// map.getClosest returns null if the passed key is > than the
// last key in the map file. getClosest is a bit of a misnomer
// since it returns exact match or the next closest key AFTER not
// BEFORE.
continue;
}
do {
// if the row matches, we might want this one.
if (rowMatches(origin, readkey)) {
// if the column pattern is not null, we use it for column matching.
// we will skip the keys whose column doesn't match the pattern.
if (columnPattern != null) {
if (!(columnPattern.matcher(Bytes.toString(readkey.getColumn())).matches())) {
continue;
}
}
// if the cell address matches, then we definitely want this key.
if (cellMatches(origin, readkey)) {
// Store key if isn't deleted or superceded by memcache
if (!HLogEdit.isDeleted(readval.get())) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
keys.add(new HStoreKey(readkey));
}
if (keys.size() >= versions) {
break;
}
} else {
deletes.add(new HStoreKey(readkey));
}
} else {
// the cell doesn't match, but there might be more with different
// timestamps, so move to the next key
continue;
}
} else {
// the row doesn't match, so we've gone too far.
break;
}
} while (map.next(readkey, readval)); // advance to the next key
}
}
return keys;
} finally {
this.lock.readLock().unlock();
}
}
/**
* Find the key that matches <i>row</i> exactly, or the one that immediately
* preceeds it. WARNING: Only use this method on a table where writes occur
* with stricly increasing timestamps. This method assumes this pattern of
* writes in order to make it reasonably performant.
* @param row
* @return Found row
* @throws IOException
*/
@SuppressWarnings("unchecked")
byte [] getRowKeyAtOrBefore(final byte [] row)
throws IOException{
// Map of HStoreKeys that are candidates for holding the row key that
// most closely matches what we're looking for. We'll have to update it as
// deletes are found all over the place as we go along before finally
// reading the best key out of it at the end.
SortedMap<HStoreKey, Long> candidateKeys = new TreeMap<HStoreKey, Long>(
new HStoreKey.HStoreKeyWritableComparator(info));
// Keep a list of deleted cell keys. We need this because as we go through
// the store files, the cell with the delete marker may be in one file and
// the old non-delete cell value in a later store file. If we don't keep
// around the fact that the cell was deleted in a newer record, we end up
// returning the old value if user is asking for more than one version.
// This List of deletes should not be large since we are only keeping rows
// and columns that match those set on the scanner and which have delete
// values. If memory usage becomes an issue, could redo as bloom filter.
Set<HStoreKey> deletes = new HashSet<HStoreKey>();
this.lock.readLock().lock();
try {
// First go to the memcache. Pick up deletes and candidates.
this.memcache.getRowKeyAtOrBefore(row, candidateKeys, deletes);
// Process each store file. Run through from newest to oldest.
// This code below is very close to the body of the getKeys method.
MapFile.Reader[] maparray = getReaders();
for (int i = maparray.length - 1; i >= 0; i--) {
// Update the candidate keys from the current map file
rowAtOrBeforeFromMapFile(maparray[i], row, candidateKeys, deletes);
}
// Return the best key from candidateKeys
byte [] result =
candidateKeys.isEmpty()? null: candidateKeys.lastKey().getRow();
return result;
} finally {
this.lock.readLock().unlock();
}
}
/*
* Check an individual MapFile for the row at or before a given key
* and timestamp
* @param map
* @param row
* @param candidateKeys
* @throws IOException
*/
private void rowAtOrBeforeFromMapFile(final MapFile.Reader map,
final byte [] row, final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes)
throws IOException {
HStoreKey startKey = new HStoreKey();
ImmutableBytesWritable startValue = new ImmutableBytesWritable();
synchronized(map) {
// Don't bother with the rest of this if the file is empty
map.reset();
if (!map.next(startKey, startValue)) {
return;
}
startKey.setHRegionInfo(this.info);
// If start row for this file is beyond passed in row, return; nothing
// in here is of use to us.
if (HStoreKey.compareTwoRowKeys(this.info, startKey.getRow(), row) > 0) {
return;
}
long now = System.currentTimeMillis();
// if there aren't any candidate keys yet, we'll do some things different
if (candidateKeys.isEmpty()) {
rowAtOrBeforeCandidate(startKey, map, row, candidateKeys, deletes, now);
} else {
rowAtOrBeforeWithCandidates(startKey, map, row, candidateKeys, deletes,
now);
}
}
}
/* Find a candidate for row that is at or before passed row in passed
* mapfile.
* @param startKey First key in the mapfile.
* @param map
* @param row
* @param candidateKeys
* @param now
* @throws IOException
*/
private void rowAtOrBeforeCandidate(final HStoreKey startKey,
final MapFile.Reader map, final byte[] row,
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes, final long now)
throws IOException {
// if the row we're looking for is past the end of this mapfile, set the
// search key to be the last key. If its a deleted key, then we'll back
// up to the row before and return that.
HStoreKey finalKey = getFinalKey(map);
HStoreKey searchKey = null;
if (HStoreKey.compareTwoRowKeys(info,finalKey.getRow(), row) < 0) {
searchKey = finalKey;
} else {
searchKey = new HStoreKey(row, this.info);
if (searchKey.compareTo(startKey) < 0) {
searchKey = startKey;
}
}
rowAtOrBeforeCandidate(map, searchKey, candidateKeys, deletes, now);
}
/*
* @param ttlSetting
* @param hsk
* @param now
* @param deletes
* @return True if key has not expired and is not in passed set of deletes.
*/
static boolean notExpiredAndNotInDeletes(final long ttl,
final HStoreKey hsk, final long now, final Set<HStoreKey> deletes) {
return !isExpired(hsk, ttl, now) &&
(deletes == null || !deletes.contains(hsk));
}
static boolean isExpired(final HStoreKey hsk, final long ttl,
final long now) {
return ttl != HConstants.FOREVER && now > hsk.getTimestamp() + ttl;
}
/* Find a candidate for row that is at or before passed key, sk, in mapfile.
* @param map
* @param sk Key to go search the mapfile with.
* @param candidateKeys
* @param now
* @throws IOException
* @see {@link #rowAtOrBeforeCandidate(HStoreKey, org.apache.hadoop.io.MapFile.Reader, byte[], SortedMap, long)}
*/
private void rowAtOrBeforeCandidate(final MapFile.Reader map,
final HStoreKey sk, final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes, final long now)
throws IOException {
HStoreKey searchKey = sk;
if (searchKey.getHRegionInfo() == null) {
searchKey.setHRegionInfo(this.info);
}
HStoreKey readkey = null;
ImmutableBytesWritable readval = new ImmutableBytesWritable();
HStoreKey knownNoGoodKey = null;
for (boolean foundCandidate = false; !foundCandidate;) {
// Seek to the exact row, or the one that would be immediately before it
readkey = (HStoreKey)map.getClosest(searchKey, readval, true);
if (readkey == null) {
// If null, we are at the start or end of the file.
break;
}
HStoreKey deletedOrExpiredRow = null;
do {
// Set this region into the readkey.
readkey.setHRegionInfo(this.info);
// If we have an exact match on row, and it's not a delete, save this
// as a candidate key
if (HStoreKey.equalsTwoRowKeys(this.info, readkey.getRow(),
searchKey.getRow())) {
if (!HLogEdit.isDeleted(readval.get())) {
if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
foundCandidate = true;
// NOTE! Continue.
continue;
}
}
HStoreKey copy = addCopyToDeletes(readkey, deletes);
if (deletedOrExpiredRow == null) {
deletedOrExpiredRow = copy;
}
} else if (HStoreKey.compareTwoRowKeys(this.info, readkey.getRow(),
searchKey.getRow()) > 0) {
// if the row key we just read is beyond the key we're searching for,
// then we're done.
break;
} else {
// So, the row key doesn't match, but we haven't gone past the row
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
if (!HLogEdit.isDeleted(readval.get())) {
if (handleNonDelete(readkey, now, deletes, candidateKeys)) {
foundCandidate = true;
// NOTE: Continue
continue;
}
}
HStoreKey copy = addCopyToDeletes(readkey, deletes);
if (deletedOrExpiredRow == null) {
deletedOrExpiredRow = copy;
}
}
} while(map.next(readkey, readval) && (knownNoGoodKey == null ||
readkey.compareTo(knownNoGoodKey) < 0));
// If we get here and have no candidates but we did find a deleted or
// expired candidate, we need to look at the key before that
if (!foundCandidate && deletedOrExpiredRow != null) {
knownNoGoodKey = deletedOrExpiredRow;
searchKey = new HStoreKey.BeforeThisStoreKey(deletedOrExpiredRow);
} else {
// No candidates and no deleted or expired candidates. Give up.
break;
}
}
// Arriving here just means that we consumed the whole rest of the map
// without going "past" the key we're searching for. we can just fall
// through here.
}
/*
* @param key Key to copy and add to <code>deletes</code>
* @param deletes
* @return Instance of the copy added to <code>deletes</code>
*/
private HStoreKey addCopyToDeletes(final HStoreKey key,
final Set<HStoreKey> deletes) {
HStoreKey copy = new HStoreKey(key);
deletes.add(copy);
return copy;
}
private void rowAtOrBeforeWithCandidates(final HStoreKey startKey,
final MapFile.Reader map, final byte[] row,
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes, final long now)
throws IOException {
HStoreKey readkey = null;
ImmutableBytesWritable readval = new ImmutableBytesWritable();
// if there are already candidate keys, we need to start our search
// at the earliest possible key so that we can discover any possible
// deletes for keys between the start and the search key. Back up to start
// of the row in case there are deletes for this candidate in this mapfile
// BUT do not backup before the first key in the mapfile else getClosest
// will return null
HStoreKey searchKey = new HStoreKey(candidateKeys.firstKey().getRow(), this.info);
if (searchKey.compareTo(startKey) < 0) {
searchKey = startKey;
}
// Seek to the exact row, or the one that would be immediately before it
readkey = (HStoreKey)map.getClosest(searchKey, readval, true);
if (readkey == null) {
// If null, we are at the start or end of the file.
// Didn't find anything that would match, so return
return;
}
do {
// if we have an exact match on row, and it's not a delete, save this
// as a candidate key
if (Bytes.equals(readkey.getRow(), row)) {
handleKey(readkey, readval.get(), now, deletes, candidateKeys);
} else if (HStoreKey.compareTwoRowKeys(info,
readkey.getRow(), row) > 0 ) {
// if the row key we just read is beyond the key we're searching for,
// then we're done.
break;
} else {
// So, the row key doesn't match, but we haven't gone past the row
// we're seeking yet, so this row is a candidate for closest
// (assuming that it isn't a delete).
handleKey(readkey, readval.get(), now, deletes, candidateKeys);
}
} while(map.next(readkey, readval));
}
/*
* @param readkey
* @param now
* @param deletes
* @param candidateKeys
*/
private void handleKey(final HStoreKey readkey, final byte [] value,
final long now, final Set<HStoreKey> deletes,
final SortedMap<HStoreKey, Long> candidateKeys) {
if (!HLogEdit.isDeleted(value)) {
handleNonDelete(readkey, now, deletes, candidateKeys);
} else {
// Pass copy because readkey will change next time next is called.
handleDeleted(new HStoreKey(readkey), candidateKeys, deletes);
}
}
/*
* @param readkey
* @param now
* @param deletes
* @param candidateKeys
* @return True if we added a candidate.
*/
private boolean handleNonDelete(final HStoreKey readkey, final long now,
final Set<HStoreKey> deletes, final Map<HStoreKey, Long> candidateKeys) {
if (notExpiredAndNotInDeletes(this.ttl, readkey, now, deletes)) {
candidateKeys.put(stripTimestamp(readkey),
Long.valueOf(readkey.getTimestamp()));
return true;
}
return false;
}
/* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
* might match by timestamp, then check for a match and remove it if it's too
* young to survive the delete
* @param k Be careful; if key was gotten from a Mapfile, pass in a copy.
* Values gotten by 'nexting' out of Mapfiles will change in each invocation.
* @param candidateKeys
* @param deletes
*/
static void handleDeleted(final HStoreKey k,
final SortedMap<HStoreKey, Long> candidateKeys,
final Set<HStoreKey> deletes) {
deletes.add(k);
HStoreKey strippedKey = stripTimestamp(k);
if (candidateKeys.containsKey(strippedKey)) {
long bestCandidateTs =
candidateKeys.get(strippedKey).longValue();
if (bestCandidateTs <= k.getTimestamp()) {
candidateKeys.remove(strippedKey);
}
}
}
/*
* @param mf MapFile to dig in.
* @return Final key from passed <code>mf</code>
* @throws IOException
*/
private HStoreKey getFinalKey(final MapFile.Reader mf) throws IOException {
HStoreKey finalKey = new HStoreKey();
mf.finalKey(finalKey);
finalKey.setHRegionInfo(this.info);
return finalKey;
}
static HStoreKey stripTimestamp(HStoreKey key) {
return new HStoreKey(key.getRow(), key.getColumn(), key.getHRegionInfo());
}
/*
* Test that the <i>target</i> matches the <i>origin</i> cell address. If the
* <i>origin</i> has an empty column, then it's assumed to mean any column
* matches and only match on row and timestamp. Otherwise, it compares the
* keys with HStoreKey.matchesRowCol().
* @param origin The key we're testing against
* @param target The key we're testing
*/
private boolean cellMatches(HStoreKey origin, HStoreKey target){
// if the origin's column is empty, then we're matching any column
if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
// if the row matches, then...
if (HStoreKey.equalsTwoRowKeys(info, target.getRow(), origin.getRow())) {
// check the timestamp
return target.getTimestamp() <= origin.getTimestamp();
}
return false;
}
// otherwise, we want to match on row and column
return target.matchesRowCol(origin);
}
/*
* Test that the <i>target</i> matches the <i>origin</i>. If the <i>origin</i>
* has an empty column, then it just tests row equivalence. Otherwise, it uses
* HStoreKey.matchesRowCol().
* @param origin Key we're testing against
* @param target Key we're testing
*/
private boolean rowMatches(HStoreKey origin, HStoreKey target){
// if the origin's column is empty, then we're matching any column
if (Bytes.equals(origin.getColumn(), HConstants.EMPTY_BYTE_ARRAY)) {
// if the row matches, then...
return HStoreKey.equalsTwoRowKeys(info, target.getRow(), origin.getRow());
}
// otherwise, we want to match on row and column
return target.matchesRowCol(origin);
}
/**
* Determines if HStore can be split
* @param force Whether to force a split or not.
* @return a StoreSize if store can be split, null otherwise
*/
StoreSize checkSplit(final boolean force) {
if (this.storefiles.size() <= 0) {
return null;
}
if (!force && (storeSize < this.desiredMaxFileSize)) {
return null;
}
this.lock.readLock().lock();
try {
// Not splitable if we find a reference store file present in the store.
boolean splitable = true;
long maxSize = 0L;
Long mapIndex = Long.valueOf(0L);
// Iterate through all the MapFiles
synchronized (storefiles) {
for (Map.Entry<Long, HStoreFile> e: storefiles.entrySet()) {
HStoreFile curHSF = e.getValue();
if (splitable) {
splitable = !curHSF.isReference();
if (!splitable) {
// RETURN IN MIDDLE OF FUNCTION!!! If not splitable, just return.
if (LOG.isDebugEnabled()) {
LOG.debug(curHSF + " is not splittable");
}
return null;
}
}
long size = curHSF.length();
if (size > maxSize) {
// This is the largest one so far
maxSize = size;
mapIndex = e.getKey();
}
}
}
// Cast to HbaseReader.
HBaseMapFile.HBaseReader r =
(HBaseMapFile.HBaseReader)this.readers.get(mapIndex);
// Get first, last, and mid keys.
r.reset();
HStoreKey firstKey = new HStoreKey();
HStoreKey lastKey = new HStoreKey();
r.next(firstKey, new ImmutableBytesWritable());
r.finalKey(lastKey);
HStoreKey mk = (HStoreKey)r.midKey();
if (mk != null) {
// if the midkey is the same as the first and last keys, then we cannot
// (ever) split this region.
if (HStoreKey.equalsTwoRowKeys(info, mk.getRow(), firstKey.getRow()) &&
HStoreKey.equalsTwoRowKeys(info, mk.getRow(), lastKey.getRow())) {
if (LOG.isDebugEnabled()) {
LOG.debug("cannot split because midkey is the same as first or last row");
}
return null;
}
return new StoreSize(maxSize, mk.getRow());
}
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeNameStr, e);
} finally {
this.lock.readLock().unlock();
}
return null;
}
/** @return aggregate size of HStore */
public long getSize() {
return storeSize;
}
//////////////////////////////////////////////////////////////////////////////
// File administration
//////////////////////////////////////////////////////////////////////////////
/**
* Return a scanner for both the memcache and the HStore files
*/
protected InternalScanner getScanner(long timestamp, byte [][] targetCols,
byte [] firstRow, RowFilterInterface filter)
throws IOException {
lock.readLock().lock();
try {
return new HStoreScanner(this, targetCols, firstRow, timestamp, filter);
} finally {
lock.readLock().unlock();
}
}
@Override
public String toString() {
return this.storeNameStr;
}
/**
* @param p Path to check.
* @return True if the path has format of a HStoreFile reference.
*/
public static boolean isReference(final Path p) {
return isReference(p, REF_NAME_PARSER.matcher(p.getName()));
}
private static boolean isReference(final Path p, final Matcher m) {
if (m == null || !m.matches()) {
LOG.warn("Failed match of store file name " + p.toString());
throw new RuntimeException("Failed match of store file name " +
p.toString());
}
return m.groupCount() > 1 && m.group(2) != null;
}
/**
* @return Current list of store files.
*/
SortedMap<Long, HStoreFile> getStorefiles() {
synchronized (this.storefiles) {
SortedMap<Long, HStoreFile> copy =
new TreeMap<Long, HStoreFile>(this.storefiles);
return copy;
}
}
/**
* @return Count of store files
*/
int getStorefilesCount() {
return this.storefiles.size();
}
/**
* @return The size of the store file indexes, in bytes.
* @throws IOException if there was a problem getting file sizes from the
* filesystem
*/
long getStorefilesIndexSize() throws IOException {
long size = 0;
for (HStoreFile s: storefiles.values())
size += s.indexLength();
return size;
}
/*
* Datastructure that holds size and key.
*/
class StoreSize {
private final long size;
private final byte[] key;
StoreSize(long size, byte[] key) {
this.size = size;
this.key = new byte[key.length];
System.arraycopy(key, 0, this.key, 0, key.length);
}
/* @return the size */
long getSize() {
return size;
}
/* @return the key */
byte[] getKey() {
return key;
}
}
HRegionInfo getHRegionInfo() {
return this.info;
}
}