blob: c6a9435c165d3b92e5752a4b5e889495da21d70d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The compaction pipeline of a {@link CompactingMemStore}, is a FIFO queue of segments. It supports
* pushing a segment at the head of the pipeline and removing a segment from the tail when it is
* flushed to disk. It also supports swap method to allow the in-memory compaction swap a subset of
* the segments at the tail of the pipeline with a new (compacted) one. This swap succeeds only if
* the version number passed with the list of segments to swap is the same as the current version of
* the pipeline. Essentially, there are two methods which can change the structure of the pipeline:
* pushHead() and swap(), the later is used both by a flush to disk and by an in-memory compaction.
* The pipeline version is updated by swap(); it allows to identify conflicting operations at the
* suffix of the pipeline. The synchronization model is copy-on-write. Methods which change the
* structure of the pipeline (pushHead(), flattenOneSegment() and swap()) apply their changes in the
* context of a lock. They also make a read-only copy of the pipeline's list. Read methods read from
* a read-only copy. If a read method accesses the read-only copy more than once it makes a local
* copy of it to ensure it accesses the same copy. The methods getVersionedList(),
* getVersionedTail(), and flattenOneSegment() are also protected by a lock since they need to have
* a consistent (atomic) view of the pipeline list and version number.
*/
@InterfaceAudience.Private
public class CompactionPipeline {
private static final Logger LOG = LoggerFactory.getLogger(CompactionPipeline.class);
public final static long FIXED_OVERHEAD =
ClassSize.align(ClassSize.OBJECT + (3 * ClassSize.REFERENCE) + Bytes.SIZEOF_LONG);
public final static long DEEP_OVERHEAD = FIXED_OVERHEAD + (2 * ClassSize.LINKEDLIST);
private final RegionServicesForStores region;
private final LinkedList<ImmutableSegment> pipeline = new LinkedList<>();
// The list is volatile to avoid reading a new allocated reference before the c'tor is executed
private volatile LinkedList<ImmutableSegment> readOnlyCopy = new LinkedList<>();
/**
* <pre>
* Version is volatile to ensure it is atomically read when not using a lock.
* To indicate whether the suffix of pipeline changes:
* 1.for {@link CompactionPipeline#pushHead(MutableSegment)},new {@link ImmutableSegment} only
* added at Head, {@link #version} not change.
* 2.for {@link CompactionPipeline#swap},{@link #version} increase.
* 3.for {@link CompactionPipeline#replaceAtIndex},{@link #version} increase.
* </pre>
*/
private volatile long version = 0;
public CompactionPipeline(RegionServicesForStores region) {
this.region = region;
}
public boolean pushHead(MutableSegment segment) {
// Record the ImmutableSegment' heap overhead when initialing
MemStoreSizing memstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment immutableSegment =
SegmentFactory.instance().createImmutableSegment(segment, memstoreAccounting);
if (region != null) {
region.addMemStoreSize(memstoreAccounting.getDataSize(), memstoreAccounting.getHeapSize(),
memstoreAccounting.getOffHeapSize(), memstoreAccounting.getCellsCount());
}
synchronized (pipeline) {
boolean res = addFirst(immutableSegment);
readOnlyCopy = new LinkedList<>(pipeline);
return res;
}
}
public VersionedSegmentsList getVersionedList() {
synchronized (pipeline) {
return new VersionedSegmentsList(readOnlyCopy, version);
}
}
public VersionedSegmentsList getVersionedTail() {
synchronized (pipeline) {
List<ImmutableSegment> segmentList = new ArrayList<>();
if (!pipeline.isEmpty()) {
segmentList.add(0, pipeline.getLast());
}
return new VersionedSegmentsList(segmentList, version);
}
}
/**
* Swaps the versioned list at the tail of the pipeline with a new segment. Swapping only if there
* were no changes to the suffix of the list since the version list was created.
* @param versionedList suffix of the pipeline to be replaced can be tail or all the pipeline
* @param segment new segment to replace the suffix. Can be null if the suffix just needs
* to be removed.
* @param closeSuffix whether to close the suffix (to release memory), as part of swapping it
* out During index merge op this will be false and for compaction it will
* be true.
* @param updateRegionSize whether to update the region size. Update the region size, when the
* pipeline is swapped as part of in-memory-flush and further
* merge/compaction. Don't update the region size when the swap is result
* of the snapshot (flush-to-disk).
* @return true iff swapped tail with new segment
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "Increment is done under a synchronize block so safe")
public boolean swap(VersionedSegmentsList versionedList, ImmutableSegment segment,
boolean closeSuffix, boolean updateRegionSize) {
if (versionedList.getVersion() != version) {
return false;
}
List<ImmutableSegment> suffix;
synchronized (pipeline) {
if (versionedList.getVersion() != version) {
return false;
}
suffix = versionedList.getStoreSegments();
LOG.debug("Swapping pipeline suffix; before={}, new segment={}",
versionedList.getStoreSegments().size(), segment);
swapSuffix(suffix, segment, closeSuffix);
readOnlyCopy = new LinkedList<>(pipeline);
version++;
}
if (updateRegionSize && region != null) {
// update the global memstore size counter
long suffixDataSize = getSegmentsKeySize(suffix);
long suffixHeapSize = getSegmentsHeapSize(suffix);
long suffixOffHeapSize = getSegmentsOffHeapSize(suffix);
int suffixCellsCount = getSegmentsCellsCount(suffix);
long newDataSize = 0;
long newHeapSize = 0;
long newOffHeapSize = 0;
int newCellsCount = 0;
if (segment != null) {
newDataSize = segment.getDataSize();
newHeapSize = segment.getHeapSize();
newOffHeapSize = segment.getOffHeapSize();
newCellsCount = segment.getCellsCount();
}
long dataSizeDelta = suffixDataSize - newDataSize;
long heapSizeDelta = suffixHeapSize - newHeapSize;
long offHeapSizeDelta = suffixOffHeapSize - newOffHeapSize;
int cellsCountDelta = suffixCellsCount - newCellsCount;
region.addMemStoreSize(-dataSizeDelta, -heapSizeDelta, -offHeapSizeDelta, -cellsCountDelta);
LOG.debug(
"Suffix data size={}, new segment data size={}, suffix heap size={},new segment heap "
+ "size={}  suffix off heap size={}, new segment off heap size={}, suffix cells "
+ "count={}, new segment cells count={}",
suffixDataSize, newDataSize, suffixHeapSize, newHeapSize, suffixOffHeapSize, newOffHeapSize,
suffixCellsCount, newCellsCount);
}
return true;
}
private static long getSegmentsHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getHeapSize();
}
return res;
}
private static long getSegmentsOffHeapSize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getOffHeapSize();
}
return res;
}
private static long getSegmentsKeySize(List<? extends Segment> list) {
long res = 0;
for (Segment segment : list) {
res += segment.getDataSize();
}
return res;
}
private static int getSegmentsCellsCount(List<? extends Segment> list) {
int res = 0;
for (Segment segment : list) {
res += segment.getCellsCount();
}
return res;
}
/**
* If the caller holds the current version, go over the the pipeline and try to flatten each
* segment. Flattening is replacing the ConcurrentSkipListMap based CellSet to CellArrayMap based.
* Flattening of the segment that initially is not based on ConcurrentSkipListMap has no effect.
* Return after one segment is successfully flatten.
* @return true iff a segment was successfully flattened
*/
public boolean flattenOneSegment(long requesterVersion, CompactingMemStore.IndexType idxType,
MemStoreCompactionStrategy.Action action) {
if (requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match. Requester version: "
+ requesterVersion + ", actual version: " + version);
return false;
}
synchronized (pipeline) {
if (requesterVersion != version) {
LOG.warn("Segment flattening failed, because versions do not match");
return false;
}
int i = -1;
for (ImmutableSegment s : pipeline) {
i++;
if (s.canBeFlattened()) {
s.waitForUpdates(); // to ensure all updates preceding s in-memory flush have completed
if (s.isEmpty()) {
// after s.waitForUpdates() is called, there is no updates pending,if no cells in s,
// we can skip it.
continue;
}
// size to be updated
MemStoreSizing newMemstoreAccounting = new NonThreadSafeMemStoreSizing();
ImmutableSegment newS = SegmentFactory.instance().createImmutableSegmentByFlattening(
(CSLMImmutableSegment) s, idxType, newMemstoreAccounting, action);
replaceAtIndex(i, newS);
if (region != null) {
// Update the global memstore size counter upon flattening there is no change in the
// data size
MemStoreSize mss = newMemstoreAccounting.getMemStoreSize();
region.addMemStoreSize(mss.getDataSize(), mss.getHeapSize(), mss.getOffHeapSize(),
mss.getCellsCount());
}
LOG.debug("Compaction pipeline segment {} flattened", s);
return true;
}
}
}
// do not update the global memstore size counter and do not increase the version,
// because all the cells remain in place
return false;
}
public boolean isEmpty() {
return readOnlyCopy.isEmpty();
}
public List<? extends Segment> getSegments() {
return readOnlyCopy;
}
public long size() {
return readOnlyCopy.size();
}
public long getMinSequenceId() {
long minSequenceId = Long.MAX_VALUE;
LinkedList<? extends Segment> localCopy = readOnlyCopy;
if (!localCopy.isEmpty()) {
minSequenceId = localCopy.getLast().getMinSequenceId();
}
return minSequenceId;
}
public MemStoreSize getTailSize() {
LinkedList<? extends Segment> localCopy = readOnlyCopy;
return localCopy.isEmpty() ? new MemStoreSize() : localCopy.peekLast().getMemStoreSize();
}
public MemStoreSize getPipelineSize() {
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
LinkedList<? extends Segment> localCopy = readOnlyCopy;
for (Segment segment : localCopy) {
memStoreSizing.incMemStoreSize(segment.getMemStoreSize());
}
return memStoreSizing.getMemStoreSize();
}
/**
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void swapSuffix(List<? extends Segment> suffix, ImmutableSegment segment,
boolean closeSegmentsInSuffix) {
matchAndRemoveSuffixFromPipeline(suffix);
if (segment != null) {
pipeline.addLast(segment);
}
// During index merge we won't be closing the segments undergoing the merge. Segment#close()
// will release the MSLAB chunks to pool. But in case of index merge there wont be any data copy
// from old MSLABs. So the new cells in new segment also refers to same chunks. In case of data
// compaction, we would have copied the cells data from old MSLAB chunks into a new chunk
// created for the result segment. So we can release the chunks associated with the compacted
// segments.
if (closeSegmentsInSuffix) {
for (Segment itemInSuffix : suffix) {
itemInSuffix.close();
}
}
}
/**
* Checking that the {@link Segment}s in suffix input parameter is same as the {@link Segment}s in
* {@link CompactionPipeline#pipeline} one by one from the last element to the first element of
* suffix. If matched, remove suffix from {@link CompactionPipeline#pipeline}. <br/>
* Must be called under the {@link CompactionPipeline#pipeline} Lock.
*/
private void matchAndRemoveSuffixFromPipeline(List<? extends Segment> suffix) {
if (suffix.isEmpty()) {
return;
}
if (pipeline.size() < suffix.size()) {
throw new IllegalStateException(
"CODE-BUG:pipleine size:[" + pipeline.size() + "],suffix size:[" + suffix.size()
+ "],pipeline size must greater than or equals suffix size");
}
ListIterator<? extends Segment> suffixIterator = suffix.listIterator(suffix.size());
ListIterator<? extends Segment> pipelineIterator = pipeline.listIterator(pipeline.size());
int count = 0;
while (suffixIterator.hasPrevious()) {
Segment suffixSegment = suffixIterator.previous();
Segment pipelineSegment = pipelineIterator.previous();
if (suffixSegment != pipelineSegment) {
throw new IllegalStateException("CODE-BUG:suffix last:[" + count + "]" + suffixSegment
+ " is not pipleline segment:[" + pipelineSegment + "]");
}
count++;
}
for (int index = 1; index <= count; index++) {
pipeline.pollLast();
}
}
// replacing one segment in the pipeline with a new one exactly at the same index
// need to be called only within synchronized block
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "VO_VOLATILE_INCREMENT",
justification = "replaceAtIndex is invoked under a synchronize block so safe")
private void replaceAtIndex(int idx, ImmutableSegment newSegment) {
pipeline.set(idx, newSegment);
readOnlyCopy = new LinkedList<>(pipeline);
// the version increment is indeed needed, because the swap uses removeAll() method of the
// linked-list that compares the objects to find what to remove.
// The flattening changes the segment object completely (creation pattern) and so
// swap will not proceed correctly after concurrent flattening.
version++;
}
public Segment getTail() {
List<? extends Segment> localCopy = getSegments();
if (localCopy.isEmpty()) {
return null;
}
return localCopy.get(localCopy.size() - 1);
}
private boolean addFirst(ImmutableSegment segment) {
pipeline.addFirst(segment);
return true;
}
// debug method
private boolean validateSuffixList(LinkedList<ImmutableSegment> suffix) {
if (suffix.isEmpty()) {
// empty suffix is always valid
return true;
}
Iterator<ImmutableSegment> pipelineBackwardIterator = pipeline.descendingIterator();
Iterator<ImmutableSegment> suffixBackwardIterator = suffix.descendingIterator();
ImmutableSegment suffixCurrent;
ImmutableSegment pipelineCurrent;
for (; suffixBackwardIterator.hasNext();) {
if (!pipelineBackwardIterator.hasNext()) {
// a suffix longer than pipeline is invalid
return false;
}
suffixCurrent = suffixBackwardIterator.next();
pipelineCurrent = pipelineBackwardIterator.next();
if (suffixCurrent != pipelineCurrent) {
// non-matching suffix
return false;
}
}
// suffix matches pipeline suffix
return true;
}
}