blob: a0313444a53e89626ffb1a92a4ba8c6676a748bc [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The ongoing MemStore Compaction manager, dispatches a solo running compaction and interrupts
* the compaction if requested. The compaction is interrupted and stopped by CompactingMemStore,
* for example when another compaction needs to be started.
* Prior to compaction the MemStoreCompactor evaluates
* the compacting ratio and aborts the compaction if it is not worthy.
* The MemStoreScanner is used to traverse the compaction pipeline. The MemStoreScanner
* is included in internal store scanner, where all compaction logic is implemented.
* Threads safety: It is assumed that the compaction pipeline is immutable,
* therefore no special synchronization is required.
*/
@InterfaceAudience.Private
public class MemStoreCompactor {
public static final long DEEP_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + 4 * ClassSize.REFERENCE
// compactingMemStore, versionedList, isInterrupted, strategy (the reference)
// "action" is an enum and thus it is a class with static final constants,
// so counting only the size of the reference to it and not the size of the internals
+ Bytes.SIZEOF_INT // compactionKVMax
+ ClassSize.ATOMIC_BOOLEAN // isInterrupted (the internals)
);
private static final Logger LOG = LoggerFactory.getLogger(MemStoreCompactor.class);
private CompactingMemStore compactingMemStore;
// a static version of the segment list from the pipeline
private VersionedSegmentsList versionedList;
// a flag raised when compaction is requested to stop
private final AtomicBoolean isInterrupted = new AtomicBoolean(false);
// the limit to the size of the groups to be later provided to MemStoreSegmentsIterator
private final int compactionKVMax;
private MemStoreCompactionStrategy strategy;
public MemStoreCompactor(CompactingMemStore compactingMemStore,
MemoryCompactionPolicy compactionPolicy) throws IllegalArgumentIOException {
this.compactingMemStore = compactingMemStore;
this.compactionKVMax = compactingMemStore.getConfiguration()
.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
initiateCompactionStrategy(compactionPolicy, compactingMemStore.getConfiguration(),
compactingMemStore.getFamilyName());
}
@Override
public String toString() {
return this.strategy + ", compactionCellMax=" + this.compactionKVMax;
}
/**----------------------------------------------------------------------
* The request to dispatch the compaction asynchronous task.
* The method returns true if compaction was successfully dispatched, or false if there
* is already an ongoing compaction or no segments to compact.
*/
public boolean start() throws IOException {
if (!compactingMemStore.hasImmutableSegments()) { // no compaction on empty pipeline
return false;
}
// get a snapshot of the list of the segments from the pipeline,
// this local copy of the list is marked with specific version
versionedList = compactingMemStore.getImmutableSegments();
LOG.trace("Speculative compaction starting on {}/{}",
compactingMemStore.getStore().getHRegion().getRegionInfo().getEncodedName(),
compactingMemStore.getStore().getColumnFamilyName());
HStore store = compactingMemStore.getStore();
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
if (cpHost != null) {
cpHost.preMemStoreCompaction(store);
}
try {
doCompaction();
} finally {
if (cpHost != null) {
cpHost.postMemStoreCompaction(store);
}
}
return true;
}
/**----------------------------------------------------------------------
* The request to cancel the compaction asynchronous task
* The compaction may still happen if the request was sent too late
* Non-blocking request
*/
public void stop() {
isInterrupted.compareAndSet(false, true);
}
public void resetStats() {
strategy.resetStats();
}
/**----------------------------------------------------------------------
* Reset the interruption indicator and clear the pointers in order to allow good
* garbage collection
*/
private void releaseResources() {
isInterrupted.set(false);
versionedList = null;
}
/**----------------------------------------------------------------------
* The worker thread performs the compaction asynchronously.
* The solo (per compactor) thread only reads the compaction pipeline.
* There is at most one thread per memstore instance.
*/
private void doCompaction() {
ImmutableSegment result = null;
boolean resultSwapped = false;
MemStoreCompactionStrategy.Action nextStep = strategy.getAction(versionedList);
boolean merge = (nextStep == MemStoreCompactionStrategy.Action.MERGE ||
nextStep == MemStoreCompactionStrategy.Action.MERGE_COUNT_UNIQUE_KEYS);
try {
if (isInterrupted.get()) { // if the entire process is interrupted cancel flattening
return; // the compaction also doesn't start when interrupted
}
if (nextStep == MemStoreCompactionStrategy.Action.NOOP) {
return;
}
if (nextStep == MemStoreCompactionStrategy.Action.FLATTEN
|| nextStep == MemStoreCompactionStrategy.Action.FLATTEN_COUNT_UNIQUE_KEYS) {
// some Segment in the pipeline is with SkipList index, make it flat
compactingMemStore.flattenOneSegment(versionedList.getVersion(), nextStep);
return;
}
// Create one segment representing all segments in the compaction pipeline,
// either by compaction or by merge
if (!isInterrupted.get()) {
result = createSubstitution(nextStep);
}
// Substitute the pipeline with one segment
if (!isInterrupted.get()) {
resultSwapped = compactingMemStore.swapCompactedSegments(versionedList, result, merge);
if (resultSwapped) {
// update compaction strategy
strategy.updateStats(result);
// update the wal so it can be truncated and not get too long
compactingMemStore.updateLowestUnflushedSequenceIdInWAL(true); // only if greater
}
}
} catch (IOException e) {
LOG.trace("Interrupting in-memory compaction for store={}",
compactingMemStore.getFamilyName());
Thread.currentThread().interrupt();
} finally {
// For the MERGE case, if the result was created, but swap didn't happen,
// we DON'T need to close the result segment (meaning its MSLAB)!
// Because closing the result segment means closing the chunks of all segments
// in the compaction pipeline, which still have ongoing scans.
if (!merge && (result != null) && !resultSwapped) {
result.close();
}
releaseResources();
compactingMemStore.setInMemoryCompactionCompleted();
}
}
/**----------------------------------------------------------------------
* Creation of the ImmutableSegment either by merge or copy-compact of the segments of the
* pipeline, based on the Compactor Iterator. The new ImmutableSegment is returned.
*/
private ImmutableSegment createSubstitution(MemStoreCompactionStrategy.Action action) throws
IOException {
ImmutableSegment result = null;
MemStoreSegmentsIterator iterator = null;
List<ImmutableSegment> segments = versionedList.getStoreSegments();
for (ImmutableSegment s : segments) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
}
switch (action) {
case COMPACT:
iterator = new MemStoreCompactorSegmentsIterator(segments,
compactingMemStore.getComparator(),
compactionKVMax, compactingMemStore.getStore());
result = SegmentFactory.instance().createImmutableSegmentByCompaction(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), compactingMemStore.getIndexType(), action);
iterator.close();
break;
case MERGE:
case MERGE_COUNT_UNIQUE_KEYS:
iterator =
new MemStoreMergerSegmentsIterator(segments,
compactingMemStore.getComparator(), compactionKVMax);
result = SegmentFactory.instance().createImmutableSegmentByMerge(
compactingMemStore.getConfiguration(), compactingMemStore.getComparator(), iterator,
versionedList.getNumOfCells(), segments, compactingMemStore.getIndexType(), action);
iterator.close();
break;
default:
throw new RuntimeException("Unknown action " + action); // sanity check
}
return result;
}
void initiateCompactionStrategy(MemoryCompactionPolicy compType,
Configuration configuration, String cfName) throws IllegalArgumentIOException {
assert (compType !=MemoryCompactionPolicy.NONE);
switch (compType){
case BASIC: strategy = new BasicMemStoreCompactionStrategy(configuration, cfName);
break;
case EAGER: strategy = new EagerMemStoreCompactionStrategy(configuration, cfName);
break;
case ADAPTIVE: strategy = new AdaptiveMemStoreCompactionStrategy(configuration, cfName);
break;
default:
// sanity check
throw new IllegalArgumentIOException("Unknown memory compaction type " + compType);
}
}
}