blob: e27acced1bb24d631c9f47a02c28d7674a7e60ce [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
/**
* A memstore implementation which supports in-memory compaction.
* A compaction pipeline is added between the active set and the snapshot data structures;
* it consists of a list of kv-sets that are subject to compaction.
* Like the snapshot, all pipeline components are read-only; updates only affect the active set.
* To ensure this property we take advantage of the existing blocking mechanism -- the active set
* is pushed to the pipeline while holding the region's updatesLock in exclusive mode.
* Periodically, a compaction is applied in the background to all pipeline components resulting
* in a single read-only component. The ``old'' components are discarded when no scanner is reading
* them.
*/
@InterfaceAudience.Private
public class CompactingMemStore extends AbstractMemStore {
public final static long DEEP_OVERHEAD_PER_PIPELINE_ITEM = ClassSize.align(
ClassSize.TIMERANGE_TRACKER + ClassSize.TIMERANGE +
ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP);
// Default fraction of in-memory-flush size w.r.t. flush-to-disk size
public static final String IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY =
"hbase.memstore.inmemoryflush.threshold.factor";
private static final double IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT = 0.25;
private static final Log LOG = LogFactory.getLog(CompactingMemStore.class);
private Store store;
private RegionServicesForStores regionServices;
private CompactionPipeline pipeline;
private MemStoreCompactor compactor;
// the threshold on active size for in-memory flush
private long inmemoryFlushSize;
private final AtomicBoolean inMemoryFlushInProgress = new AtomicBoolean(false);
private final AtomicBoolean allowCompaction = new AtomicBoolean(true);
public CompactingMemStore(Configuration conf, CellComparator c,
HStore store, RegionServicesForStores regionServices) throws IOException {
super(conf, c);
this.store = store;
this.regionServices = regionServices;
this.pipeline = new CompactionPipeline(getRegionServices());
this.compactor = new MemStoreCompactor(this);
initInmemoryFlushSize(conf);
}
private void initInmemoryFlushSize(Configuration conf) {
long memstoreFlushSize = getRegionServices().getMemstoreFlushSize();
int numStores = getRegionServices().getNumStores();
if (numStores <= 1) {
// Family number might also be zero in some of our unit test case
numStores = 1;
}
inmemoryFlushSize = memstoreFlushSize / numStores;
// multiply by a factor
double factor = conf.getDouble(IN_MEMORY_FLUSH_THRESHOLD_FACTOR_KEY,
IN_MEMORY_FLUSH_THRESHOLD_FACTOR_DEFAULT);
inmemoryFlushSize *= factor;
LOG.info("Setting in-memory flush size threshold to " + inmemoryFlushSize);
}
public static long getSegmentSize(Segment segment) {
return segment.getSize() - DEEP_OVERHEAD_PER_PIPELINE_ITEM;
}
public static long getSegmentsSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += getSegmentSize(segment);
}
return res;
}
/**
* @return Total memory occupied by this MemStore.
* This is not thread safe and the memstore may be changed while computing its size.
* It is the responsibility of the caller to make sure this doesn't happen.
*/
@Override
public long size() {
long res = 0;
for (Segment item : getSegments()) {
res += item.getSize();
}
return res;
}
/**
* This method is called when it is clear that the flush to disk is completed.
* The store may do any post-flush actions at this point.
* One example is to update the WAL with sequence number that is known only at the store level.
*/
@Override public void finalizeFlush() {
updateLowestUnflushedSequenceIdInWAL(false);
}
@Override public boolean isSloppy() {
return true;
}
/**
* Push the current active memstore segment into the pipeline
* and create a snapshot of the tail of current compaction pipeline
* Snapshot must be cleared by call to {@link #clearSnapshot}.
* {@link #clearSnapshot(long)}.
* @return {@link MemStoreSnapshot}
*/
@Override
public MemStoreSnapshot snapshot() {
MutableSegment active = getActive();
// If snapshot currently has entries, then flusher failed or didn't call
// cleanup. Log a warning.
if (!getSnapshot().isEmpty()) {
LOG.warn("Snapshot called again without clearing previous. " +
"Doing nothing. Another ongoing flush or did we fail last attempt?");
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("FLUSHING TO DISK: region "
+ getRegionServices().getRegionInfo().getRegionNameAsString() + "store: "
+ getFamilyName());
}
stopCompaction();
pushActiveToPipeline(active);
snapshotId = EnvironmentEdgeManager.currentTime();
pushTailToSnapshot();
}
return new MemStoreSnapshot(snapshotId, getSnapshot());
}
/**
* On flush, how much memory we will clear.
* @return size of data that is going to be flushed
*/
@Override public long getFlushableSize() {
long snapshotSize = getSnapshot().getSize();
if(snapshotSize == 0) {
//if snapshot is empty the tail of the pipeline is flushed
snapshotSize = pipeline.getTailSize();
}
return snapshotSize > 0 ? snapshotSize : keySize();
}
@Override
public void updateLowestUnflushedSequenceIdInWAL(boolean onlyIfGreater) {
long minSequenceId = pipeline.getMinSequenceId();
if(minSequenceId != Long.MAX_VALUE) {
byte[] encodedRegionName = getRegionServices().getRegionInfo().getEncodedNameAsBytes();
byte[] familyName = getFamilyNameInByte();
WAL WAL = getRegionServices().getWAL();
if (WAL != null) {
WAL.updateStore(encodedRegionName, familyName, minSequenceId, onlyIfGreater);
}
}
}
@Override
public List<Segment> getSegments() {
List<Segment> pipelineList = pipeline.getSegments();
List<Segment> list = new LinkedList<Segment>();
list.add(getActive());
list.addAll(pipelineList);
list.add(getSnapshot());
return list;
}
public boolean swapCompactedSegments(VersionedSegmentsList versionedList,
ImmutableSegment result) {
return pipeline.swap(versionedList, result);
}
public boolean hasCompactibleSegments() {
return !pipeline.isEmpty();
}
public VersionedSegmentsList getCompactibleSegments() {
return pipeline.getVersionedList();
}
public long getSmallestReadPoint() {
return store.getSmallestReadPoint();
}
public Store getStore() {
return store;
}
public String getFamilyName() {
return Bytes.toString(getFamilyNameInByte());
}
@Override
/*
* Scanners are ordered from 0 (oldest) to newest in increasing order.
*/
public List<KeyValueScanner> getScanners(long readPt) throws IOException {
List<Segment> pipelineList = pipeline.getSegments();
long order = pipelineList.size();
// The list of elements in pipeline + the active element + the snapshot segment
// TODO : This will change when the snapshot is made of more than one element
List<KeyValueScanner> list = new ArrayList<KeyValueScanner>(pipelineList.size() + 2);
list.add(getActive().getSegmentScanner(readPt, order + 1));
for (Segment item : pipelineList) {
list.add(item.getSegmentScanner(readPt, order));
order--;
}
list.add(getSnapshot().getSegmentScanner(readPt, order));
return Collections.<KeyValueScanner> singletonList(
new MemStoreScanner((AbstractMemStore) this, list, readPt));
}
/**
* Check whether anything need to be done based on the current active set size.
* The method is invoked upon every addition to the active set.
* For CompactingMemStore, flush the active set to the read-only memory if it's
* size is above threshold
*/
@Override
protected void checkActiveSize() {
if (shouldFlushInMemory()) {
/* The thread is dispatched to flush-in-memory. This cannot be done
* on the same thread, because for flush-in-memory we require updatesLock
* in exclusive mode while this method (checkActiveSize) is invoked holding updatesLock
* in the shared mode. */
InMemoryFlushRunnable runnable = new InMemoryFlushRunnable();
if (LOG.isTraceEnabled()) {
LOG.trace(
"Dispatching the MemStore in-memory flush for store " + store.getColumnFamilyName());
}
getPool().execute(runnable);
}
}
// internally used method, externally visible only for tests
// when invoked directly from tests it must be verified that the caller doesn't hold updatesLock,
// otherwise there is a deadlock
@VisibleForTesting
void flushInMemory() throws IOException {
// setting the inMemoryFlushInProgress flag again for the case this method is invoked
// directly (only in tests) in the common path setting from true to true is idempotent
// Speculative compaction execution, may be interrupted if flush is forced while
// compaction is in progress
inMemoryFlushInProgress.set(true);
try {
// Phase I: Update the pipeline
getRegionServices().blockUpdates();
try {
MutableSegment active = getActive();
if (LOG.isDebugEnabled()) {
LOG.debug("IN-MEMORY FLUSH: Pushing active segment into compaction pipeline, "
+ "and initiating compaction.");
}
pushActiveToPipeline(active);
} finally {
getRegionServices().unblockUpdates();
}
// Used by tests
if (!allowCompaction.get()) {
return;
}
// Phase II: Compact the pipeline
try {
compactor.startCompaction();
} catch (IOException e) {
LOG.warn("Unable to run memstore compaction. region "
+ getRegionServices().getRegionInfo().getRegionNameAsString() + "store: "
+ getFamilyName(), e);
}
} finally {
inMemoryFlushInProgress.set(false);
}
}
private byte[] getFamilyNameInByte() {
return store.getFamily().getName();
}
private ThreadPoolExecutor getPool() {
return getRegionServices().getInMemoryCompactionPool();
}
private boolean shouldFlushInMemory() {
if (getActive().getSize() > inmemoryFlushSize) {
// size above flush threshold
return inMemoryFlushInProgress.compareAndSet(false, true);
}
return false;
}
/**
* The request to cancel the compaction asynchronous task (caused by in-memory flush)
* The compaction may still happen if the request was sent too late
* Non-blocking request
*/
private void stopCompaction() {
if (inMemoryFlushInProgress.get()) {
compactor.stopCompact();
inMemoryFlushInProgress.set(false);
}
}
private void pushActiveToPipeline(MutableSegment active) {
if (!active.isEmpty()) {
long delta = DEEP_OVERHEAD_PER_PIPELINE_ITEM - DEEP_OVERHEAD;
active.setSize(active.getSize() + delta);
pipeline.pushHead(active);
resetCellSet();
}
}
private void pushTailToSnapshot() {
ImmutableSegment tail = pipeline.pullTail();
if (!tail.isEmpty()) {
setSnapshot(tail);
long size = getSegmentSize(tail);
setSnapshotSize(size);
}
}
private RegionServicesForStores getRegionServices() {
return regionServices;
}
/**
* The in-memory-flusher thread performs the flush asynchronously.
* There is at most one thread per memstore instance.
* It takes the updatesLock exclusively, pushes active into the pipeline, releases updatesLock
* and compacts the pipeline.
*/
private class InMemoryFlushRunnable implements Runnable {
@Override
public void run() {
try {
flushInMemory();
} catch (IOException e) {
LOG.warn("Unable to run memstore compaction. region "
+ getRegionServices().getRegionInfo().getRegionNameAsString()
+ "store: "+ getFamilyName(), e);
}
}
}
//----------------------------------------------------------------------
//methods for tests
//----------------------------------------------------------------------
@VisibleForTesting
boolean isMemStoreFlushingInMemory() {
return inMemoryFlushInProgress.get();
}
@VisibleForTesting
void disableCompaction() {
allowCompaction.set(false);
}
@VisibleForTesting
void enableCompaction() {
allowCompaction.set(true);
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.
* @return Next row or null if none found.
*/
Cell getNextRow(final Cell cell) {
Cell lowest = null;
List<Segment> segments = getSegments();
for (Segment segment : segments) {
if (lowest == null) {
lowest = getNextRow(cell, segment.getCellSet());
} else {
lowest = getLowest(lowest, getNextRow(cell, segment.getCellSet()));
}
}
return lowest;
}
}