blob: 7ff6a2e6c6c3104f49f35fea5d220c636e9ac16b [file] [log] [blame]
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hdfs.MiniDFSCluster;
/**
* Test of a long-lived scanner validating as we go.
*/
public class TestScanner extends HBaseTestCase {
private final Log LOG = LogFactory.getLog(this.getClass());
private static final byte [] FIRST_ROW = HConstants.EMPTY_START_ROW;
private static final byte [][] COLS = { HConstants.CATALOG_FAMILY };
private static final byte [][] EXPLICIT_COLS = {
HConstants.REGIONINFO_QUALIFIER, HConstants.SERVER_QUALIFIER,
// TODO ryan
//HConstants.STARTCODE_QUALIFIER
};
static final HTableDescriptor TESTTABLEDESC =
new HTableDescriptor("testscanner");
static {
TESTTABLEDESC.addFamily(new HColumnDescriptor(HConstants.CATALOG_FAMILY,
10, // Ten is arbitrary number. Keep versions to help debuggging.
Compression.Algorithm.NONE.getName(), false, true, 8 * 1024,
HConstants.FOREVER, StoreFile.BloomType.NONE.toString(),
HConstants.REPLICATION_SCOPE_LOCAL));
}
/** HRegionInfo for root region */
public static final HRegionInfo REGION_INFO =
new HRegionInfo(TESTTABLEDESC, HConstants.EMPTY_BYTE_ARRAY,
HConstants.EMPTY_BYTE_ARRAY);
private static final byte [] ROW_KEY = REGION_INFO.getRegionName();
private static final long START_CODE = Long.MAX_VALUE;
private MiniDFSCluster cluster = null;
private HRegion r;
private HRegionIncommon region;
@Override
public void setUp() throws Exception {
cluster = new MiniDFSCluster(conf, 2, true, (String[])null);
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR,
this.cluster.getFileSystem().getHomeDirectory().toString());
super.setUp();
}
/**
* Test basic stop row filter works.
* @throws Exception
*/
public void testStopRow() throws Exception {
byte [] startrow = Bytes.toBytes("bbb");
byte [] stoprow = Bytes.toBytes("ccc");
try {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
addContent(this.r, HConstants.CATALOG_FAMILY);
List<KeyValue> results = new ArrayList<KeyValue>();
// Do simple test of getting one row only first.
Scan scan = new Scan(Bytes.toBytes("abc"), Bytes.toBytes("abd"));
scan.addFamily(HConstants.CATALOG_FAMILY);
InternalScanner s = r.getScanner(scan);
int count = 0;
while (s.next(results)) {
count++;
}
s.close();
assertEquals(0, count);
// Now do something a bit more imvolved.
scan = new Scan(startrow, stoprow);
scan.addFamily(HConstants.CATALOG_FAMILY);
s = r.getScanner(scan);
count = 0;
KeyValue kv = null;
results = new ArrayList<KeyValue>();
for (boolean first = true; s.next(results);) {
kv = results.get(0);
if (first) {
assertTrue(Bytes.BYTES_COMPARATOR.compare(startrow, kv.getRow()) == 0);
first = false;
}
count++;
}
assertTrue(Bytes.BYTES_COMPARATOR.compare(stoprow, kv.getRow()) > 0);
// We got something back.
assertTrue(count > 10);
s.close();
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(this.cluster);
}
}
void rowPrefixFilter(Scan scan) throws IOException {
List<KeyValue> results = new ArrayList<KeyValue>();
scan.addFamily(HConstants.CATALOG_FAMILY);
InternalScanner s = r.getScanner(scan);
boolean hasMore = true;
while (hasMore) {
hasMore = s.next(results);
for (KeyValue kv : results) {
assertEquals((byte)'a', kv.getRow()[0]);
assertEquals((byte)'b', kv.getRow()[1]);
}
results.clear();
}
s.close();
}
void rowInclusiveStopFilter(Scan scan, byte[] stopRow) throws IOException {
List<KeyValue> results = new ArrayList<KeyValue>();
scan.addFamily(HConstants.CATALOG_FAMILY);
InternalScanner s = r.getScanner(scan);
boolean hasMore = true;
while (hasMore) {
hasMore = s.next(results);
for (KeyValue kv : results) {
assertTrue(Bytes.compareTo(kv.getRow(), stopRow) <= 0);
}
results.clear();
}
s.close();
}
public void testFilters() throws IOException {
try {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
addContent(this.r, HConstants.CATALOG_FAMILY);
byte [] prefix = Bytes.toBytes("ab");
Filter newFilter = new PrefixFilter(prefix);
Scan scan = new Scan();
scan.setFilter(newFilter);
rowPrefixFilter(scan);
byte[] stopRow = Bytes.toBytes("bbc");
newFilter = new WhileMatchFilter(new InclusiveStopFilter(stopRow));
scan = new Scan();
scan.setFilter(newFilter);
rowInclusiveStopFilter(scan, stopRow);
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(this.cluster);
}
}
/**
* Test that closing a scanner while a client is using it doesn't throw
* NPEs but instead a UnknownScannerException. HBASE-2503
* @throws Exception
*/
public void testRaceBetweenClientAndTimeout() throws Exception {
try {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
addContent(this.r, HConstants.CATALOG_FAMILY);
Scan scan = new Scan();
InternalScanner s = r.getScanner(scan);
List<KeyValue> results = new ArrayList<KeyValue>();
try {
s.next(results);
s.close();
s.next(results);
fail("We don't want anything more, we should be failing");
} catch (UnknownScannerException ex) {
// ok!
return;
}
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(this.cluster);
}
}
/** The test!
* @throws IOException
*/
public void testScanner() throws IOException {
try {
r = createNewHRegion(TESTTABLEDESC, null, null);
region = new HRegionIncommon(r);
// Write information to the meta table
Put put = new Put(ROW_KEY, System.currentTimeMillis(), null);
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteStream);
REGION_INFO.write(s);
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
byteStream.toByteArray());
region.put(put);
// What we just committed is in the memstore. Verify that we can get
// it back both with scanning and get
scan(false, null);
getRegionInfo();
// Close and re-open
r.close();
r = openClosedRegion(r);
region = new HRegionIncommon(r);
// Verify we can get the data back now that it is on disk.
scan(false, null);
getRegionInfo();
// Store some new information
HServerAddress address = new HServerAddress("foo.bar.com:1234");
put = new Put(ROW_KEY, System.currentTimeMillis(), null);
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(address.toString()));
// put.add(HConstants.COL_STARTCODE, Bytes.toBytes(START_CODE));
region.put(put);
// Validate that we can still get the HRegionInfo, even though it is in
// an older row on disk and there is a newer row in the memstore
scan(true, address.toString());
getRegionInfo();
// flush cache
region.flushcache();
// Validate again
scan(true, address.toString());
getRegionInfo();
// Close and reopen
r.close();
r = openClosedRegion(r);
region = new HRegionIncommon(r);
// Validate again
scan(true, address.toString());
getRegionInfo();
// Now update the information again
address = new HServerAddress("bar.foo.com:4321");
put = new Put(ROW_KEY, System.currentTimeMillis(), null);
put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
Bytes.toBytes(address.toString()));
region.put(put);
// Validate again
scan(true, address.toString());
getRegionInfo();
// flush cache
region.flushcache();
// Validate again
scan(true, address.toString());
getRegionInfo();
// Close and reopen
r.close();
r = openClosedRegion(r);
region = new HRegionIncommon(r);
// Validate again
scan(true, address.toString());
getRegionInfo();
// clean up
r.close();
r.getLog().closeAndDelete();
} finally {
shutdownDfs(cluster);
}
}
/** Compare the HRegionInfo we read from HBase to what we stored */
private void validateRegionInfo(byte [] regionBytes) throws IOException {
HRegionInfo info =
(HRegionInfo) Writables.getWritable(regionBytes, new HRegionInfo());
assertEquals(REGION_INFO.getRegionId(), info.getRegionId());
assertEquals(0, info.getStartKey().length);
assertEquals(0, info.getEndKey().length);
assertEquals(0, Bytes.compareTo(info.getRegionName(), REGION_INFO.getRegionName()));
assertEquals(0, info.getTableDesc().compareTo(REGION_INFO.getTableDesc()));
}
/** Use a scanner to get the region info and then validate the results */
private void scan(boolean validateStartcode, String serverName)
throws IOException {
InternalScanner scanner = null;
Scan scan = null;
List<KeyValue> results = new ArrayList<KeyValue>();
byte [][][] scanColumns = {
COLS,
EXPLICIT_COLS
};
for(int i = 0; i < scanColumns.length; i++) {
try {
scan = new Scan(FIRST_ROW);
for (int ii = 0; ii < EXPLICIT_COLS.length; ii++) {
scan.addColumn(COLS[0], EXPLICIT_COLS[ii]);
}
scanner = r.getScanner(scan);
while (scanner.next(results)) {
assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER));
byte [] val = getColumn(results, HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER).getValue();
validateRegionInfo(val);
if(validateStartcode) {
// assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
// HConstants.STARTCODE_QUALIFIER));
// val = getColumn(results, HConstants.CATALOG_FAMILY,
// HConstants.STARTCODE_QUALIFIER).getValue();
assertNotNull(val);
assertFalse(val.length == 0);
long startCode = Bytes.toLong(val);
assertEquals(START_CODE, startCode);
}
if(serverName != null) {
assertTrue(hasColumn(results, HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER));
val = getColumn(results, HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER).getValue();
assertNotNull(val);
assertFalse(val.length == 0);
String server = Bytes.toString(val);
assertEquals(0, server.compareTo(serverName));
}
}
} finally {
InternalScanner s = scanner;
scanner = null;
if(s != null) {
s.close();
}
}
}
}
private boolean hasColumn(final List<KeyValue> kvs, final byte [] family,
final byte [] qualifier) {
for (KeyValue kv: kvs) {
if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
return true;
}
}
return false;
}
private KeyValue getColumn(final List<KeyValue> kvs, final byte [] family,
final byte [] qualifier) {
for (KeyValue kv: kvs) {
if (kv.matchingFamily(family) && kv.matchingQualifier(qualifier)) {
return kv;
}
}
return null;
}
/** Use get to retrieve the HRegionInfo and validate it */
private void getRegionInfo() throws IOException {
Get get = new Get(ROW_KEY);
get.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
Result result = region.get(get, null);
byte [] bytes = result.value();
validateRegionInfo(bytes);
}
/**
* Tests to do a sync flush during the middle of a scan. This is testing the StoreScanner
* update readers code essentially. This is not highly concurrent, since its all 1 thread.
* HBase-910.
* @throws Exception
*/
public void testScanAndSyncFlush() throws Exception {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
HRegionIncommon hri = new HRegionIncommon(r);
try {
LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
int count = count(hri, -1, false);
assertEquals(count, count(hri, 100, false)); // do a sync flush.
} catch (Exception e) {
LOG.error("Failed", e);
throw e;
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(cluster);
}
}
/**
* Tests to do a concurrent flush (using a 2nd thread) while scanning. This tests both
* the StoreScanner update readers and the transition from memstore -> snapshot -> store file.
*
* @throws Exception
*/
public void testScanAndRealConcurrentFlush() throws Exception {
this.r = createNewHRegion(REGION_INFO.getTableDesc(), null, null);
HRegionIncommon hri = new HRegionIncommon(r);
try {
LOG.info("Added: " + addContent(hri, Bytes.toString(HConstants.CATALOG_FAMILY),
Bytes.toString(HConstants.REGIONINFO_QUALIFIER)));
int count = count(hri, -1, false);
assertEquals(count, count(hri, 100, true)); // do a true concurrent background thread flush
} catch (Exception e) {
LOG.error("Failed", e);
throw e;
} finally {
this.r.close();
this.r.getLog().closeAndDelete();
shutdownDfs(cluster);
}
}
/*
* @param hri Region
* @param flushIndex At what row we start the flush.
* @param concurrent if the flush should be concurrent or sync.
* @return Count of rows found.
* @throws IOException
*/
private int count(final HRegionIncommon hri, final int flushIndex,
boolean concurrent)
throws IOException {
LOG.info("Taking out counting scan");
ScannerIncommon s = hri.getScanner(HConstants.CATALOG_FAMILY, EXPLICIT_COLS,
HConstants.EMPTY_START_ROW, HConstants.LATEST_TIMESTAMP);
List<KeyValue> values = new ArrayList<KeyValue>();
int count = 0;
boolean justFlushed = false;
while (s.next(values)) {
if (justFlushed) {
LOG.info("after next() just after next flush");
justFlushed=false;
}
count++;
if (flushIndex == count) {
LOG.info("Starting flush at flush index " + flushIndex);
Thread t = new Thread() {
public void run() {
try {
hri.flushcache();
LOG.info("Finishing flush");
} catch (IOException e) {
LOG.info("Failed flush cache");
}
}
};
if (concurrent) {
t.start(); // concurrently flush.
} else {
t.run(); // sync flush
}
LOG.info("Continuing on after kicking off background flush");
justFlushed = true;
}
}
s.close();
LOG.info("Found " + count + " items");
return count;
}
}