blob: 3d31d2a39e064b21f3d2e5bbc26cb8eb06d3d55a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.htrace.Trace;
/**
* This is the scanner for any MemStore implementation, derived from MemStore.
* The MemStoreScanner combines SegmentScanner from different Segments and
* uses the key-value heap and the reversed key-value heap for the aggregated key-values set.
* It is assumed that only traversing forward or backward is used (without zigzagging in between)
*/
@InterfaceAudience.Private
public class MemStoreScanner extends NonLazyKeyValueScanner {
/**
* Types of cell MemStoreScanner
*/
static public enum Type {
UNDEFINED,
COMPACT_FORWARD,
USER_SCAN_FORWARD,
USER_SCAN_BACKWARD
}
// heap of scanners used for traversing forward
private KeyValueHeap forwardHeap;
// reversed scanners heap for traversing backward
private ReversedKeyValueHeap backwardHeap;
// The type of the scan is defined by constructor
// or according to the first usage
private Type type = Type.UNDEFINED;
private long readPoint;
// remember the initial version of the scanners list
List<KeyValueScanner> scanners;
// pointer back to the relevant MemStore
// is needed for shouldSeek() method
private AbstractMemStore backwardReferenceToMemStore;
/**
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
* After constructor only one heap is going to be initialized for entire lifespan
* of the MemStoreScanner. A specific scanner can only be one directional!
*
* @param ms Pointer back to the MemStore
* @param scanners List of scanners over the segments
* @param readPt Read point below which we can safely remove duplicate KVs
*/
public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt)
throws IOException {
this(ms, scanners, readPt, Type.UNDEFINED);
}
/**
* If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default!
* After constructor only one heap is going to be initialized for entire lifespan
* of the MemStoreScanner. A specific scanner can only be one directional!
*
* @param ms Pointer back to the MemStore
* @param scanners List of scanners over the segments
* @param readPt Read point below which we can safely remove duplicate KVs
* @param type The scan type COMPACT_FORWARD should be used for compaction
*/
public MemStoreScanner(AbstractMemStore ms, List<KeyValueScanner> scanners, long readPt,
Type type) throws IOException {
super();
this.readPoint = readPt;
this.type = type;
switch (type) {
case UNDEFINED:
case USER_SCAN_FORWARD:
case COMPACT_FORWARD:
this.forwardHeap = new KeyValueHeap(scanners, ms.getComparator());
break;
case USER_SCAN_BACKWARD:
this.backwardHeap = new ReversedKeyValueHeap(scanners, ms.getComparator());
break;
default:
throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner");
}
this.backwardReferenceToMemStore = ms;
this.scanners = scanners;
if (Trace.isTracing() && Trace.currentSpan() != null) {
Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner");
}
}
/**
* Returns the cell from the top-most scanner without advancing the iterator.
* The backward traversal is assumed, only if specified explicitly
*/
@Override
public synchronized Cell peek() {
if (type == Type.USER_SCAN_BACKWARD) {
return backwardHeap.peek();
}
return forwardHeap.peek();
}
/**
* Gets the next cell from the top-most scanner. Assumed forward scanning.
*/
@Override
public synchronized Cell next() throws IOException {
KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap;
// loop over till the next suitable value
// take next value from the heap
for (Cell currentCell = heap.next();
currentCell != null;
currentCell = heap.next()) {
// all the logic of presenting cells is inside the internal SegmentScanners
// located inside the heap
return currentCell;
}
return null;
}
/**
* Set the scanner at the seek key. Assumed forward scanning.
* Must be called only once: there is no thread safety between the scanner
* and the memStore.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean seek(Cell cell) throws IOException {
assertForward();
if (cell == null) {
close();
return false;
}
return forwardHeap.seek(cell);
}
/**
* Move forward on the sub-lists set previously by seek. Assumed forward scanning.
*
* @param cell seek value (should be non-null)
* @return true if there is at least one KV to read, false otherwise
*/
@Override
public synchronized boolean reseek(Cell cell) throws IOException {
/*
* See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
* This code is executed concurrently with flush and puts, without locks.
* Two points must be known when working on this code:
* 1) It's not possible to use the 'kvTail' and 'snapshot'
* variables, as they are modified during a flush.
* 2) The ideal implementation for performance would use the sub skip list
* implicitly pointed by the iterators 'kvsetIt' and
* 'snapshotIt'. Unfortunately the Java API does not offer a method to
* get it. So we remember the last keys we iterated to and restore
* the reseeked set to at least that point.
*
* TODO: The above comment copied from the original MemStoreScanner
*/
assertForward();
return forwardHeap.reseek(cell);
}
/**
* MemStoreScanner returns Long.MAX_VALUE because it will always have the latest data among all
* scanners.
* @see KeyValueScanner#getScannerOrder()
*/
@Override
public long getScannerOrder() {
return Long.MAX_VALUE;
}
@Override
public synchronized void close() {
if (forwardHeap != null) {
assert ((type == Type.USER_SCAN_FORWARD) ||
(type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED));
forwardHeap.close();
forwardHeap = null;
if (backwardHeap != null) {
backwardHeap.close();
backwardHeap = null;
}
} else if (backwardHeap != null) {
assert (type == Type.USER_SCAN_BACKWARD);
backwardHeap.close();
backwardHeap = null;
}
}
/**
* Set the scanner at the seek key. Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean backwardSeek(Cell cell) throws IOException {
initBackwardHeapIfNeeded(cell, false);
return backwardHeap.backwardSeek(cell);
}
/**
* Assumed backward scanning.
*
* @param cell seek value
* @return false if the key is null or if there is no data
*/
@Override
public synchronized boolean seekToPreviousRow(Cell cell) throws IOException {
initBackwardHeapIfNeeded(cell, false);
if (backwardHeap.peek() == null) {
restartBackwardHeap(cell);
}
return backwardHeap.seekToPreviousRow(cell);
}
@Override
public synchronized boolean seekToLastRow() throws IOException {
// TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't
// implement seekToLastRow() method :(
// however seekToLastRow() was implemented in internal MemStoreScanner
// so I wonder whether we need to come with our own workaround, or to update
// ReversedKeyValueHeap
return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true);
}
/**
* Check if this memstore may contain the required keys
* @return False if the key definitely does not exist in this Memstore
*/
@Override
public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
if (type == Type.COMPACT_FORWARD) {
return true;
}
for (KeyValueScanner sc : scanners) {
if (sc.shouldUseScanner(scan, store, oldestUnexpiredTS)) {
return true;
}
}
return false;
}
// debug method
@Override
public String toString() {
StringBuffer buf = new StringBuffer();
int i = 1;
for (KeyValueScanner scanner : scanners) {
buf.append("scanner (" + i + ") " + scanner.toString() + " ||| ");
i++;
}
return buf.toString();
}
/****************** Private methods ******************/
/**
* Restructure the ended backward heap after rerunning a seekToPreviousRow()
* on each scanner
* @return false if given Cell does not exist in any scanner
*/
private boolean restartBackwardHeap(Cell cell) throws IOException {
boolean res = false;
for (KeyValueScanner scan : scanners) {
res |= scan.seekToPreviousRow(cell);
}
this.backwardHeap =
new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
return res;
}
/**
* Checks whether the type of the scan suits the assumption of moving backward
*/
private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException {
boolean res = false;
if (toLast && (type != Type.UNDEFINED)) {
throw new IllegalStateException(
"Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString());
}
if (type == Type.UNDEFINED) {
// In case we started from peek, release the forward heap
// and build backward. Set the correct type. Thus this turn
// can happen only once
if ((backwardHeap == null) && (forwardHeap != null)) {
forwardHeap.close();
forwardHeap = null;
// before building the heap seek for the relevant key on the scanners,
// for the heap to be built from the scanners correctly
for (KeyValueScanner scan : scanners) {
if (toLast) {
res |= scan.seekToLastRow();
} else {
res |= scan.backwardSeek(cell);
}
}
this.backwardHeap =
new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator());
type = Type.USER_SCAN_BACKWARD;
}
}
if (type == Type.USER_SCAN_FORWARD) {
throw new IllegalStateException("Traversing backward with forward scan");
}
return res;
}
/**
* Checks whether the type of the scan suits the assumption of moving forward
*/
private void assertForward() throws IllegalStateException {
if (type == Type.UNDEFINED) {
type = Type.USER_SCAN_FORWARD;
}
if (type == Type.USER_SCAN_BACKWARD) {
throw new IllegalStateException("Traversing forward with backward scan");
}
}
}