blob: abe9cf8c23ccd06138de4ee3d1085ca0c0e96693 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A memstore implementation which supports in-memory compaction.
* A compaction pipeline is added between the active set and the snapshot data structures;
* it consists of a list of segments that are subject to compaction.
* Like the snapshot, all pipeline segments are read-only; updates only affect the active set.
* To ensure this property we take advantage of the existing blocking mechanism -- the active set
* is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
* Periodically, a compaction is applied in the background to all pipeline segments resulting
* in a single read-only component. The ``old'' segments are discarded when no scanner is reading
* them.
*/
@InterfaceAudience.Private
public class CompactingMemStore extends AbstractMemStore {
// The external setting of the compacting MemStore behaviour
public static final String COMPACTING_MEMSTORE_TYPE_KEY =
"hbase.hregion.compacting.memstore.type";
public static final String COMPACTING_MEMSTORE_TYPE_DEFAULT =
String.valueOf(MemoryCompactionPolicy.NONE);
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
private static final int IN_MEMORY_FLUSH_MULTIPLIER = 1;
// In-Memory compaction pool size
public static final String IN_MEMORY_CONPACTION_POOL_SIZE_KEY =
"hbase.regionserver.inmemory.compaction.pool.size";
public static final int IN_MEMORY_CONPACTION_POOL_SIZE_DEFAULT = 10;
private static final Logger LOG = LoggerFactory.getLogger(CompactingMemStore.class);
private HStore store;
private CompactionPipeline pipeline;
protected MemStoreCompactor compactor;
private long inmemoryFlushSize; // the threshold on active size for in-memory flush
private final AtomicBoolean inMemoryCompactionInProgress = new AtomicBoolean(false);
// inWalReplay is true while we are synchronously replaying the edits from WAL
private boolean inWalReplay = false;
protected final AtomicBoolean allowCompaction = new AtomicBoolean(true);
private boolean compositeSnapshot = true;
/**
* Types of indexes (part of immutable segments) to be used after flattening,
* compaction, or merge are applied.
*/
public enum IndexType {
CSLM_MAP, // ConcurrentSkipLisMap
ARRAY_MAP, // CellArrayMap
CHUNK_MAP // CellChunkMap
}
private IndexType indexType = IndexType.ARRAY_MAP; // default implementation
public static final long DEEP_OVERHEAD = ClassSize.align( AbstractMemStore.DEEP_OVERHEAD
+ 6 * ClassSize.REFERENCE // Store, CompactionPipeline,
// MemStoreCompactor, inMemoryCompactionInProgress,
// allowCompaction, indexType
+ Bytes.SIZEOF_LONG // inmemoryFlushSize
+ 2 * Bytes.SIZEOF_BOOLEAN // compositeSnapshot and inWalReplay
+ 2 * ClassSize.ATOMIC_BOOLEAN// inMemoryCompactionInProgress and allowCompaction
+ CompactionPipeline.DEEP_OVERHEAD + MemStoreCompactor.DEEP_OVERHEAD);
public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices,
MemoryCompactionPolicy compactionPolicy) throws IOException {
super(conf, c, regionServices);
this.store = store;
this.regionServices = regionServices;
this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = createMemStoreCompactor(compactionPolicy);
if (conf.getBoolean(MemStoreLAB.USEMSLAB_KEY, MemStoreLAB.USEMSLAB_DEFAULT)) {
// if user requested to work with MSLABs (whether on- or off-heap), then the
// immutable segments are going to use CellChunkMap as their index
indexType = IndexType.CHUNK_MAP;
} else {
indexType = IndexType.ARRAY_MAP;
}
// initialization of the flush size should happen after initialization of the index type
// so do not transfer the following method
initInmemoryFlushSize(conf);
LOG.info("Store={}, in-memory flush size threshold={}, immutable segments index type={}, " +
"compactor={}", this.store.getColumnFamilyName(),
StringUtils.byteDesc(this.inmemoryFlushSize), this.indexType,
(this.compactor == null? "NULL": this.compactor.toString()));
}
protected MemStoreCompactor createMemStoreCompactor(MemoryCompactionPolicy compactionPolicy)
throws IllegalArgumentIOException {
return new MemStoreCompactor(this, compactionPolicy);
}
private void initInmemoryFlushSize(Configuration conf) {
double factor = 0;
long memstoreFlushSize = getRegionServices().getMemStoreFlushSize();
int numStores = getRegionServices().getNumStores();
if (numStores <= 1) {
// Family number might also be zero in some of our unit test case
numStores = 1;
}
factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY, 0.0);
if(factor != 0.0) {
// multiply by a factor (the same factor for all index types)
inmemoryFlushSize = (long) (factor * memstoreFlushSize) / numStores;
} else {
inmemoryFlushSize = IN_MEMORY_FLUSH_MULTIPLIER *
conf.getLong(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT);
inmemoryFlushSize -= ChunkCreator.SIZEOF_CHUNK_HEADER;
}
}
/**
* @return Total memory occupied by this MemStore. This won't include any size occupied by the
* snapshot. We assume the snapshot will get cleared soon. This is not thread safe and
* the memstore may be changed while computing its size. It is the responsibility of the
* caller to make sure this doesn't happen.
*/
@Override
public MemStoreSize size() {
MemStoreSizing memstoreSizing = new NonThreadSafeMemStoreSizing();
memstoreSizing.incMemStoreSize(getActive().getMemStoreSize());
for (Segment item : pipeline.getSegments()) {
memstoreSizing.incMemStoreSize(item.getMemStoreSize());
}
return memstoreSizing.getMemStoreSize();
}
/**
* This method is called before the flush is executed.
* @return an estimation (lower bound) of the unflushed sequence id in memstore after the flush
* is executed. if memstore will be cleared returns {@code HConstants.NO_SEQNUM}.
*/
@Override
public long preFlushSeqIDEstimation() {
if(compositeSnapshot) {
return HConstants.NO_SEQNUM;
}
Segment segment = getLastSegment();
if(segment == null) {
return HConstants.NO_SEQNUM;
}
return segment.getMinSequenceId();
}
@Override
public boolean isSloppy() {
return true;
}
/**
* Push the current active memstore segment into the pipeline
* and create a snapshot of the tail of current compaction pipeline
* Snapshot must be cleared by call to {@link #clearSnapshot}.
* {@link #clearSnapshot(long)}.
* @return {@link MemStoreSnapshot}
*/
@Override
public MemStoreSnapshot snapshot() {
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!this.snapshot.isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
LOG.debug("FLUSHING TO DISK {}, store={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName());
stopCompaction();
// region level lock ensures pushing active to pipeline is done in isolation
// no concurrent update operations trying to flush the active segment
pushActiveToPipeline(getActive());
resetTimeOfOldestEdit();
snapshotId = EnvironmentEdgeManager.currentTime();
// in both cases whatever is pushed to snapshot is cleared from the pipeline
if (compositeSnapshot) {
pushPipelineToSnapshot();
} else {
pushTailToSnapshot();
}
compactor.resetStats();
}
return new MemStoreSnapshot(snapshotId, this.snapshot);
}
@Override
public MemStoreSize getFlushableSize() {
MemStoreSize mss = getSnapshotSize();
if (mss.getDataSize() == 0) {
// if snapshot is empty the tail of the pipeline (or everything in the memstore) is flushed
if (compositeSnapshot) {
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing(pipeline.getPipelineSize());
MutableSegment currActive = getActive();
if(!currActive.isEmpty()) {
memStoreSizing.incMemStoreSize(currActive.getMemStoreSize());
}
mss = memStoreSizing.getMemStoreSize();
} else {
mss = pipeline.getTailSize();
}
}
return mss.getDataSize() > 0? mss: getActive().getMemStoreSize();
}
public void setInMemoryCompactionCompleted() {
inMemoryCompactionInProgress.set(false);
}
protected boolean setInMemoryCompactionFlag() {
return inMemoryCompactionInProgress.compareAndSet(false, true);
}
@Override
protected long keySize() {
// Need to consider dataSize/keySize of all segments in pipeline and active
long keySize = getActive().getDataSize();
for (Segment segment : this.pipeline.getSegments()) {
keySize += segment.getDataSize();
}
return keySize;
}
@Override
protected long heapSize() {
// Need to consider heapOverhead of all segments in pipeline and active
long h = getActive().getHeapSize();
for (Segment segment : this.pipeline.getSegments()) {
h += segment.getHeapSize();
}
return h;
}
@Override
public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
long minSequenceId = pipeline.getMinSequenceId();
if(minSequenceId != Long.MAX_VALUE) {
byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
byte[] familyName = getFamilyNameInBytes();
WAL WAL = getRegionServices().getWAL();
if (WAL != null) {
WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
}
}
}
/**
* This message intends to inform the MemStore that next coming updates
* are going to be part of the replaying edits from WAL
*/
@Override
public void startReplayingFromWAL() {
inWalReplay = true;
}
/**
* This message intends to inform the MemStore that the replaying edits from WAL
* are done
*/
@Override
public void stopReplayingFromWAL() {
inWalReplay = false;
}
/**
* Issue any synchronization and test needed before applying the update
* For compacting memstore this means checking the update can increase the size without
* overflow
* @param currentActive the segment to be updated
* @param cell the cell to be added
* @param memstoreSizing object to accumulate region size changes
* @return true iff can proceed with applying the update
*/
@Override
protected boolean preUpdate(MutableSegment currentActive, Cell cell,
MemStoreSizing memstoreSizing) {
if (currentActive.sharedLock()) {
if (checkAndAddToActiveSize(currentActive, cell, memstoreSizing)) {
return true;
}
currentActive.sharedUnlock();
}
return false;
}
@Override protected void postUpdate(MutableSegment currentActive) {
currentActive.sharedUnlock();
}
@Override protected boolean sizeAddedPreOperation() {
return true;
}
// the getSegments() method is used for tests only
@Override
protected List<Segment> getSegments() {
List<? extends Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new ArrayList<>(pipelineList.size() + 2);
list.add(getActive());
list.addAll(pipelineList);
list.addAll(snapshot.getAllSegments());
return list;
}
// the following three methods allow to manipulate the settings of composite snapshot
public void setCompositeSnapshot(boolean useCompositeSnapshot) {
this.compositeSnapshot = useCompositeSnapshot;
}
public boolean swapCompactedSegments(VersionedSegmentsList versionedList, ImmutableSegment result,
boolean merge) {
// last true stands for updating the region size
return pipeline.swap(versionedList, result, !merge, true);
}
/**
* @param requesterVersion The caller must hold the VersionedList of the pipeline
* with version taken earlier. This version must be passed as a parameter here.
* The flattening happens only if versions match.
*/
public void flattenOneSegment(long requesterVersion, MemStoreCompactionStrategy.Action action) {
pipeline.flattenOneSegment(requesterVersion, indexType, action);
}
// setter is used only for testability
void setIndexType(IndexType type) {
indexType = type;
// Because this functionality is for testing only and tests are setting in-memory flush size
// according to their need, there is no setting of in-memory flush size, here.
// If it is needed, please change in-memory flush size explicitly
}
public IndexType getIndexType() {
return indexType;
}
public boolean hasImmutableSegments() {
return !pipeline.isEmpty();
}
public VersionedSegmentsList getImmutableSegments() {
return pipeline.getVersionedList();
}
public long getSmallestReadPoint() {
return store.getSmallestReadPoint();
}
public HStore getStore() {
return store;
}
public String getFamilyName() {
return Bytes.toString(getFamilyNameInBytes());
}
@Override
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
MutableSegment activeTmp = getActive();
List<? extends Segment> pipelineList = pipeline.getSegments();
List<? extends Segment> snapshotList = snapshot.getAllSegments();
long numberOfSegments = 1L + pipelineList.size() + snapshotList.size();
// The list of elements in pipeline + the active element + the snapshot segment
List<KeyValueScanner> list = createList((int) numberOfSegments);
addToScanners(activeTmp, readPt, list);
addToScanners(pipelineList, readPt, list);
addToScanners(snapshotList, readPt, list);
return list;
}
protected List<KeyValueScanner> createList(int capacity) {
return new ArrayList<>(capacity);
}
/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
* For CompactingMemStore, flush the active set to the read-only memory if it's
* size is above threshold
* @param currActive intended segment to update
* @param cellToAdd cell to be added to the segment
* @param memstoreSizing object to accumulate changed size
* @return true if the cell can be added to the
*/
private boolean checkAndAddToActiveSize(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
if (shouldFlushInMemory(currActive, cellToAdd, memstoreSizing)) {
if (currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
if (setInMemoryCompactionFlag()) {
// The thread is dispatched to do in-memory compaction in the background
InMemoryCompactionRunnable runnable = new InMemoryCompactionRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace("Dispatching the MemStore in-memory flush for store " + store
.getColumnFamilyName());
}
getPool().execute(runnable);
}
}
return false;
}
return true;
}
// externally visible only for tests
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
// otherwise there is a deadlock
void flushInMemory() {
MutableSegment currActive = getActive();
if(currActive.setInMemoryFlushed()) {
flushInMemory(currActive);
}
inMemoryCompaction();
}
private void flushInMemory(MutableSegment currActive) {
LOG.trace("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline");
pushActiveToPipeline(currActive);
}
void inMemoryCompaction() {
// setting the inMemoryCompactionInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent
inMemoryCompactionInProgress.set(true);
// Used by tests
if (!allowCompaction.get()) {
return;
}
try {
// Speculative compaction execution, may be interrupted if flush is forced while
// compaction is in progress
if(!compactor.start()) {
setInMemoryCompactionCompleted();
}
} catch (IOException e) {
LOG.warn("Unable to run in-memory compaction on {}/{}; exception={}",
getRegionServices().getRegionInfo().getEncodedName(), getFamilyName(), e);
}
}
private Segment getLastSegment() {
Segment localActive = getActive();
Segment tail = pipeline.getTail();
return tail == null ? localActive : tail;
}
private byte[] getFamilyNameInBytes() {
return store.getColumnFamilyDescriptor().getName();
}
private ThreadPoolExecutor getPool() {
return getRegionServices().getInMemoryCompactionPool();
}
protected boolean shouldFlushInMemory(MutableSegment currActive, Cell cellToAdd,
MemStoreSizing memstoreSizing) {
long cellSize = MutableSegment.getCellLength(cellToAdd);
long segmentDataSize = currActive.getDataSize();
while (segmentDataSize + cellSize < inmemoryFlushSize || inWalReplay) {
// when replaying edits from WAL there is no need in in-memory flush regardless the size
// otherwise size below flush threshold try to update atomically
if (currActive.compareAndSetDataSize(segmentDataSize, segmentDataSize + cellSize)) {
if (memstoreSizing != null) {
memstoreSizing.incMemStoreSize(cellSize, 0, 0, 0);
}
// enough space for cell - no need to flush
return false;
}
segmentDataSize = currActive.getDataSize();
}
// size above flush threshold
return true;
}
/**
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
* The compaction may still happen if the request was sent too late
* Non-blocking request
*/
private void stopCompaction() {
if (inMemoryCompactionInProgress.get()) {
compactor.stop();
}
}
protected void pushActiveToPipeline(MutableSegment currActive) {
if (!currActive.isEmpty()) {
pipeline.pushHead(currActive);
resetActive();
}
}
private void pushTailToSnapshot() {
VersionedSegmentsList segments = pipeline.getVersionedTail();
pushToSnapshot(segments.getStoreSegments());
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
pipeline.swap(segments,null,false, false);
}
private void pushPipelineToSnapshot() {
int iterationsCnt = 0;
boolean done = false;
while (!done) {
iterationsCnt++;
VersionedSegmentsList segments = pipeline.getVersionedList();
pushToSnapshot(segments.getStoreSegments());
// swap can return false in case the pipeline was updated by ongoing compaction
// and the version increase, the chance of it happenning is very low
// In Swap: don't close segments (they are in snapshot now) and don't update the region size
done = pipeline.swap(segments, null, false, false);
if (iterationsCnt>2) {
// practically it is impossible that this loop iterates more than two times
// (because the compaction is stopped and none restarts it while in snapshot request),
// however stopping here for the case of the infinite loop causing by any error
LOG.warn("Multiple unsuccessful attempts to push the compaction pipeline to snapshot," +
" while flushing to disk.");
this.snapshot = SegmentFactory.instance().createImmutableSegment(getComparator());
break;
}
}
}
private void pushToSnapshot(List<ImmutableSegment> segments) {
if(segments.isEmpty()) return;
if(segments.size() == 1 && !segments.get(0).isEmpty()) {
this.snapshot = segments.get(0);
return;
} else { // create composite snapshot
this.snapshot =
SegmentFactory.instance().createCompositeImmutableSegment(getComparator(), segments);
}
}
private RegionServicesForStores getRegionServices() {
return regionServices;
}
/**
* The in-memory-flusher thread performs the flush asynchronously.
* There is at most one thread per memstore instance.
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
* and compacts the pipeline.
*/
private class InMemoryCompactionRunnable implements Runnable {
@Override
public void run() {
inMemoryCompaction();
}
}
boolean isMemStoreFlushingInMemory() {
return inMemoryCompactionInProgress.get();
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
long getInmemoryFlushSize() {
return inmemoryFlushSize;
}
// debug method
public void debug() {
String msg = "active size=" + getActive().getDataSize();
msg += " allow compaction is "+ (allowCompaction.get() ? "true" : "false");
msg += " inMemoryCompactionInProgress is "+ (inMemoryCompactionInProgress.get() ? "true" :
"false");
LOG.debug(msg);
}
}