blob: d97a2aa10660e6042407c050a42b6e30e94b9125 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments.
* It supports pushing a segment at the head of the pipeline and removing a segment from the
* tail when it is flushed to disk.
* It also supports swap method to allow the in-memory compaction swap a subset of the segments
* at the tail of the pipeline with a new (compacted) one. This swap succeeds only if the version
* number passed with the list of segments to swap is the same as the current version of the
* pipeline.
* Essentially, there are two methods which can change the structure of the pipeline: pushHead()
* and swap(), the later is used both by a flush to disk and by an in-memory compaction.
* The pipeline version is updated by swap(); it allows to identify conflicting operations at the
* suffix of the pipeline.
*
* The synchronization model is copy-on-write. Methods which change the structure of the
* pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes in the context of a
* lock. They also make a read-only copy of the pipeline's list. Read methods read from a
* read-only copy. If a read method accesses the read-only copy more than once it makes a local
* copy of it to ensure it accesses the same copy.
*
* The methods getVersionedList(), getVersionedTail(), and flattenOneSegment() are also
* protected by a lock since they need to have a consistent (atomic) view of the pipeline list
* and version number.
*/
@InterfaceAudience.Private
public class CompactionPipeline {
private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);
public final static long FIXED_OVERHEAD = ClassSize
.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
private final RegionServicesForStores region;
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
// Version is volatile to ensure it is atomically read when not using a lock
private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
}
public boolean pushHead(MutableSegment segment) {
// Record the ImmutableSegment' heap overhead when initialing
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment immutableSegment = SegmentFactory.instance().
createImmutableSegment(segment, memstoreAccounting);
if (region != null) {
region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
}
synchronized (pipeline){
boolean res = addFirst(immutableSegment);
readOnlyCopy = new LinkedList<>(pipeline);
return res;
}
}
public VersionedSegmentsList getVersionedList() {
synchronized (pipeline){
return new VersionedSegmentsList(readOnlyCopy, version);
}
}
public VersionedSegmentsList getVersionedTail() {
synchronized (pipeline){
List<ImmutableSegment> segmentList = new ArrayList<>();
if(!pipeline.isEmpty()) {
segmentList.add(0, pipeline.getLast());
}
return new VersionedSegmentsList(segmentList, version);
}
}
/**
* Swaps the versioned list at the tail of the pipeline with a new segment.
* Swapping only if there were no changes to the suffix of the list since the version list was
* created.
* @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
* @param segment new segment to replace the suffix. Can be null if the suffix just needs to be
* removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it out
* During index merge op this will be false and for compaction it will be true.
* @param updateRegionSize whether to update the region size. Update the region size,
* when the pipeline is swapped as part of in-memory-flush and
* further merge/compaction. Don't update the region size when the
* swap is result of the snapshot (flush-to-disk).
* @return true iff swapped tail with new segment
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
justification="Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
boolean closeSuffix, boolean updateRegionSize) {
if (versionedList.getVersion() != version) {
return false;
}
List<ImmutableSegment> suffix;
synchronized (pipeline){
if(versionedList.getVersion() != version) {
return false;
}
suffix = versionedList.getStoreSegments();
LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
versionedList.getStoreSegments().size(), segment);
swapSuffix(suffix, segment, closeSuffix);
readOnlyCopy = new LinkedList<>(pipeline);
version++;
}
if (updateRegionSize && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long suffixHeapSize = getSegmentsHeapSize(suffix);
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
int suffixCellsCount = getSegmentsCellsCount(suffix);
long newDataSize = 0;
long newHeapSize = 0;
long newOffHeapSize = 0;
int newCellsCount = 0;
if (segment != null) {
newDataSize = segment.getDataSize();
newHeapSize = segment.getHeapSize();
newOffHeapSize = segment.getOffHeapSize();
newCellsCount = segment.getCellsCount();
}
long dataSizeDelta = suffixDataSize - newDataSize;
long heapSizeDelta = suffixHeapSize - newHeapSize;
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
int cellsCountDelta = suffixCellsCount - newCellsCount;
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
LOG.debug(
"Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
+ "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells "
+ "count={}, new segment cells count={}",
suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
suffixCellsCount, newCellsCount);
}
return true;
}
private static long getSegmentsHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getHeapSize();
}
return res;
}
private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getOffHeapSize();
}
return res;
}
private static long getSegmentsKeySize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getDataSize();
}
return res;
}
private static int getSegmentsCellsCount(List<? extends Segment> list) {
int res = 0;
for (Segment segment : list) {
res += segment.getCellsCount();
}
return res;
}
/**
* If the caller holds the current version, go over the the pipeline and try to flatten each
* segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
* Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
* Return after one segment is successfully flatten.
*
* @return true iff a segment was successfully flattened
*/
public boolean flattenOneSegment(long requesterVersion,
CompactingMemStore.IndexType idxType,
MemStoreCompactionStrategy.Action action) {
if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
+ requesterVersion + ", actual version: " + version);
return false;
}
synchronized (pipeline){
if(requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
int i = 0;
for (ImmutableSegment s : pipeline) {
if ( s.canBeFlattened() ) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment)s,idxType,newMemstoreAccounting,action);
replaceAtIndex(i,newS);
if (region != null) {
// Update the global memstore size counter upon flattening there is no change in the
// data size
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
mss.getCellsCount());
}
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
}
i++;
}
}
// do not update the global memstore size counter and do not increase the version,
// because all the cells remain in place
return false;
}
public boolean isEmpty() {
return readOnlyCopy.isEmpty();
}
public List<? extends Segment> getSegments() {
return readOnlyCopy;
}
public long size() {
return readOnlyCopy.size();
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (!localCopy.isEmpty()) {
minSequenceId = localCopy.getLast().getMinSequenceId();
}
return minSequenceId;
}
public MemStoreSize getTailSize() {
LinkedList<? extends Segment> localCopy = readOnlyCopy;
return localCopy.isEmpty()? new MemStoreSize(): localCopy.peekLast().getMemStoreSize();
}
public MemStoreSize getPipelineSize() {
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
LinkedList<? extends Segment> localCopy = readOnlyCopy;
for (Segment segment : localCopy) {
memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
}
return memStoreSizing.getMemStoreSize();
}
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
pipeline.removeAll(suffix);
if(segment != null) pipeline.addLast(segment);
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
// compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
// created for the result segment. So we can release the chunks associated with the compacted
// segments.
if (closeSegmentsInSuffix) {
for (Segment itemInSuffix : suffix) {
itemInSuffix.close();
}
}
}
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="VO_VOLATILE_INCREMENT",
justification="replaceAtIndex is invoked under a synchronize block so safe")
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
pipeline.set(idx, newSegment);
readOnlyCopy = new LinkedList<>(pipeline);
// the version increment is indeed needed, because the swap uses removeAll() method of the
// linked-list that compares the objects to find what to remove.
// The flattening changes the segment object completely (creation pattern) and so
// swap will not proceed correctly after concurrent flattening.
version++;
}
public Segment getTail() {
List<? extends Segment> localCopy = getSegments();
if(localCopy.isEmpty()) {
return null;
}
return localCopy.get(localCopy.size() - 1);
}
private boolean addFirst(ImmutableSegment segment) {
pipeline.addFirst(segment);
return true;
}
// debug method
private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
if(suffix.isEmpty()) {
// empty suffix is always valid
return true;
}
Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
ImmutableSegment suffixCurrent;
ImmutableSegment pipelineCurrent;
for( ; suffixBackwardIterator.hasNext(); ) {
if(!pipelineBackwardIterator.hasNext()) {
// a suffix longer than pipeline is invalid
return false;
}
suffixCurrent = suffixBackwardIterator.next();
pipelineCurrent = pipelineBackwardIterator.next();
if(suffixCurrent != pipelineCurrent) {
// non-matching suffix
return false;
}
}
// suffix matches pipeline suffix
return true;
}
}