blob: 0d6d2be5e2820a820db4fd168a08dc323880be43 [file] [log] [blame]
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NavigableSet;
/**
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
* into List<KeyValue> for a single row.
*/
class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
private Store store;
private ScanQueryMatcher matcher;
private KeyValueHeap heap;
private boolean cacheBlocks;
// Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
private boolean closing = false;
private final boolean isGet;
// if heap == null and lastTop != null, you need to reseek given the key below
private KeyValue lastTop = null;
/**
* Opens a scanner across memstore, snapshot, and all StoreFiles.
*
* @param store who we scan
* @param scan the spec
* @param columns which columns we are scanning
* @throws IOException
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
throws IOException {
this.store = store;
this.cacheBlocks = scan.getCacheBlocks();
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
columns, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()),
false);
this.isGet = scan.isGetScan();
// pass columns = try to filter out unnecessary ScanFiles
List<KeyValueScanner> scanners = getScanners(scan, columns);
// Seek all scanners to the start of the Row (or if the exact maching row key does not
// exist, then to the start of the next matching Row).
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);
this.store.addChangedReaderObserver(this);
}
/**
* Used for major compactions.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
* @param scan the spec
* @param scanners ancilliary scanners
*/
StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
boolean retainDeletesInOutput)
throws IOException {
this.store = store;
this.cacheBlocks = false;
this.isGet = false;
matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
null, store.ttl, store.comparator.getRawComparator(),
store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);
}
// Constructor for testing.
StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
final KeyValue.KVComparator comparator,
final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners)
throws IOException {
this.store = null;
this.isGet = false;
this.cacheBlocks = scan.getCacheBlocks();
this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
comparator.getRawComparator(), scan.getMaxVersions(), false);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
heap = new KeyValueHeap(scanners, comparator);
}
/*
* @return List of scanners ordered properly.
*/
private List<KeyValueScanner> getScanners() throws IOException {
// First the store file scanners
// TODO this used to get the store files in descending order,
// but now we get them in ascending order, which I think is
// actually more correct, since memstore get put at the end.
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
List<KeyValueScanner> scanners =
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
scanners.addAll(sfScanners);
// Then the memstore scanners
scanners.addAll(this.store.memstore.getScanners());
return scanners;
}
/*
* @return List of scanners to seek, possibly filtered by StoreFile.
*/
private List<KeyValueScanner> getScanners(Scan scan,
final NavigableSet<byte[]> columns) throws IOException {
boolean memOnly;
boolean filesOnly;
if (scan instanceof InternalScan) {
InternalScan iscan = (InternalScan)scan;
memOnly = iscan.isCheckOnlyMemStore();
filesOnly = iscan.isCheckOnlyStoreFiles();
} else {
memOnly = false;
filesOnly = false;
}
List<KeyValueScanner> scanners = new LinkedList<KeyValueScanner>();
// First the store file scanners
if (memOnly == false) {
List<StoreFileScanner> sfScanners = StoreFileScanner
.getScannersForStoreFiles(store.getStorefiles(), cacheBlocks, isGet);
// include only those scan files which pass all filters
for (StoreFileScanner sfs : sfScanners) {
if (sfs.shouldSeek(scan, columns)) {
scanners.add(sfs);
}
}
}
// Then the memstore scanners
if ((filesOnly == false) && (this.store.memstore.shouldSeek(scan))) {
scanners.addAll(this.store.memstore.getScanners());
}
return scanners;
}
public synchronized KeyValue peek() {
if (this.heap == null) {
return this.lastTop;
}
return this.heap.peek();
}
public KeyValue next() {
// throw runtime exception perhaps?
throw new RuntimeException("Never call StoreScanner.next()");
}
public synchronized void close() {
if (this.closing) return;
this.closing = true;
// under test, we dont have a this.store
if (this.store != null)
this.store.deleteChangedReaderObserver(this);
if (this.heap != null)
this.heap.close();
this.heap = null; // CLOSED!
this.lastTop = null; // If both are null, we are closed.
}
public synchronized boolean seek(KeyValue key) throws IOException {
if (this.heap == null) {
List<KeyValueScanner> scanners = getScanners();
heap = new KeyValueHeap(scanners, store.comparator);
}
return this.heap.seek(key);
}
/**
* Get the next row of values from this Store.
* @param outResult
* @param limit
* @return true if there are more rows, false if scanner is done
*/
public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
//DebugPrint.println("SS.next");
checkReseek();
// if the heap was left null, then the scanners had previously run out anyways, close and
// return.
if (this.heap == null) {
close();
return false;
}
KeyValue peeked = this.heap.peek();
if (peeked == null) {
close();
return false;
}
// only call setRow if the row changes; avoids confusing the query matcher
// if scanning intra-row
if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
matcher.setRow(peeked.getRow());
}
KeyValue kv;
List<KeyValue> results = new ArrayList<KeyValue>();
LOOP: while((kv = this.heap.peek()) != null) {
// kv is no longer immutable due to KeyOnlyFilter! use copy for safety
KeyValue copyKv = new KeyValue(kv.getBuffer(), kv.getOffset(), kv.getLength());
ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
//DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
switch(qcode) {
case INCLUDE:
results.add(copyKv);
this.heap.next();
if (limit > 0 && (results.size() == limit)) {
break LOOP;
}
continue;
case DONE:
// copy jazz
outResult.addAll(results);
return true;
case DONE_SCAN:
close();
// copy jazz
outResult.addAll(results);
return false;
case SEEK_NEXT_ROW:
// This is just a relatively simple end of scan fix, to short-cut end us if there is a
// endKey in the scan.
if (!matcher.moreRowsMayExistAfter(kv)) {
outResult.addAll(results);
return false;
}
reseek(matcher.getKeyForNextRow(kv));
break;
case SEEK_NEXT_COL:
reseek(matcher.getKeyForNextColumn(kv));
break;
case SKIP:
this.heap.next();
break;
case SEEK_NEXT_USING_HINT:
KeyValue nextKV = matcher.getNextKeyHint(kv);
if (nextKV != null) {
reseek(nextKV);
} else {
heap.next();
}
break;
default:
throw new RuntimeException("UNEXPECTED");
}
}
if (!results.isEmpty()) {
// copy jazz
outResult.addAll(results);
return true;
}
// No more keys
close();
return false;
}
public synchronized boolean next(List<KeyValue> outResult) throws IOException {
return next(outResult, -1);
}
// Implementation of ChangedReadersObserver
public synchronized void updateReaders() throws IOException {
if (this.closing) return;
// All public synchronized API calls will call 'checkReseek' which will cause
// the scanner stack to reseek if this.heap==null && this.lastTop != null.
// But if two calls to updateReaders() happen without a 'next' or 'peek' then we
// will end up calling this.peek() which would cause a reseek in the middle of a updateReaders
// which is NOT what we want, not to mention could cause an NPE. So we early out here.
if (this.heap == null) return;
// this could be null.
this.lastTop = this.peek();
//DebugPrint.println("SS updateReaders, topKey = " + lastTop);
// close scanners to old obsolete Store files
this.heap.close(); // bubble thru and close all scanners.
this.heap = null; // the re-seeks could be slow (access HDFS) free up memory ASAP
// Let the next() call handle re-creating and seeking
}
private void checkReseek() throws IOException {
if (this.heap == null && this.lastTop != null) {
resetScannerStack(this.lastTop);
this.lastTop = null; // gone!
}
// else dont need to reseek
}
private void resetScannerStack(KeyValue lastTopKey) throws IOException {
if (heap != null) {
throw new RuntimeException("StoreScanner.reseek run on an existing heap!");
}
/* When we have the scan object, should we not pass it to getScanners()
* to get a limited set of scanners? We did so in the constructor and we
* could have done it now by storing the scan object from the constructor */
List<KeyValueScanner> scanners = getScanners();
for(KeyValueScanner scanner : scanners) {
scanner.seek(lastTopKey);
}
// Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator);
// Reset the state of the Query Matcher and set to top row
matcher.reset();
KeyValue kv = heap.peek();
matcher.setRow((kv == null ? lastTopKey : kv).getRow());
}
@Override
public synchronized boolean reseek(KeyValue kv) throws IOException {
//Heap cannot be null, because this is only called from next() which
//guarantees that heap will never be null before this call.
return this.heap.reseek(kv);
}
@Override
public long getSequenceID() {
return 0;
}
}