blob: 7d48fce17e7da30f0aac9054edccf99a8c83d0b5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.util.BloomFilterUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.runners.Parameterized.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests optimized scanning of multiple columns. <br>
* We separated the big test into several sub-class UT, because When in ROWCOL bloom type, we will
* test the row-col bloom filter frequently for saving HDFS seek once we switch from one column to
* another in our UT. It's cpu time consuming (~45s for each case), so moved the ROWCOL case into a
* separated LargeTests to avoid timeout failure. <br>
* <br>
* To be clear: In TestMultiColumnScanner, we will flush 10 (NUM_FLUSHES=10) HFiles here, and the
* table will put ~1000 cells (rows=20, ts=6, qualifiers=8, total=20*6*8 ~ 1000) . Each full table
* scan will check the ROWCOL bloom filter 20 (rows)* 8 (column) * 10 (hfiles)= 1600 times, beside
* it will scan the full table 6*2^8=1536 times, so finally will have 1600*1536=2457600 bloom filter
* testing. (See HBASE-21520)
*/
public abstract class TestMultiColumnScanner {
private static final Logger LOG = LoggerFactory.getLogger(TestMultiColumnScanner.class);
private static final String TABLE_NAME =
TestMultiColumnScanner.class.getSimpleName();
static final int MAX_VERSIONS = 50;
private static final String FAMILY = "CF";
private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
/**
* The size of the column qualifier set used. Increasing this parameter
* exponentially increases test time.
*/
private static final int NUM_COLUMNS = 8;
private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
private static final int NUM_FLUSHES = 10;
private static final int NUM_ROWS = 20;
/** A large value of type long for use as a timestamp */
private static final long BIG_LONG = 9111222333444555666L;
/**
* Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because
* it will be replaced by an timestamp auto-generated based on the time.
*/
private static final long[] TIMESTAMPS = new long[] { 1, 3, 5,
Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1 };
/** The probability that a column is skipped in a store file. */
private static final double COLUMN_SKIP_IN_STORE_FILE_PROB = 0.7;
/** The probability to delete a row/column pair */
private static final double DELETE_PROBABILITY = 0.02;
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Parameter(0)
public Compression.Algorithm comprAlgo;
@Parameter(1)
public BloomType bloomType;
@Parameter(2)
public DataBlockEncoding dataBlockEncoding;
// Some static sanity-checking.
static {
assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos.
// Ensure TIMESTAMPS are sorted.
for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
}
public static Collection<Object[]> generateParams(Compression.Algorithm algo,
boolean useDataBlockEncoding) {
List<Object[]> parameters = new ArrayList<>();
for (BloomType bloomType : BloomType.values()) {
DataBlockEncoding dataBlockEncoding =
useDataBlockEncoding ? DataBlockEncoding.PREFIX : DataBlockEncoding.NONE;
parameters.add(new Object[] { algo, bloomType, dataBlockEncoding });
}
return parameters;
}
@Test
public void testMultiColumnScanner() throws IOException {
TEST_UTIL.getConfiguration().setInt(BloomFilterUtil.PREFIX_LENGTH_KEY, 10);
HRegion region = TEST_UTIL.createTestRegion(TABLE_NAME,
ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_BYTES).setCompressionType(comprAlgo)
.setBloomFilterType(bloomType).setMaxVersions(MAX_VERSIONS)
.setDataBlockEncoding(dataBlockEncoding).build(),
BlockCacheFactory.createBlockCache(TEST_UTIL.getConfiguration()));
List<String> rows = sequentialStrings("row", NUM_ROWS);
List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
List<KeyValue> kvs = new ArrayList<>();
Set<String> keySet = new HashSet<>();
// A map from <row>_<qualifier> to the most recent delete timestamp for
// that column.
Map<String, Long> lastDelTimeMap = new HashMap<>();
Random rand = new Random(29372937L);
for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
for (String qual : qualifiers) {
// This is where we decide to include or not include this column into
// this store file, regardless of row and timestamp.
if (rand.nextDouble() < COLUMN_SKIP_IN_STORE_FILE_PROB)
continue;
byte[] qualBytes = Bytes.toBytes(qual);
for (String row : rows) {
Put p = new Put(Bytes.toBytes(row));
for (long ts : TIMESTAMPS) {
String value = createValue(row, qual, ts);
KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
value);
assertEquals(kv.getTimestamp(), ts);
p.add(kv);
String keyAsString = kv.toString();
if (!keySet.contains(keyAsString)) {
keySet.add(keyAsString);
kvs.add(kv);
}
}
region.put(p);
Delete d = new Delete(Bytes.toBytes(row));
boolean deletedSomething = false;
for (long ts : TIMESTAMPS)
if (rand.nextDouble() < DELETE_PROBABILITY) {
d.addColumns(FAMILY_BYTES, qualBytes, ts);
String rowAndQual = row + "_" + qual;
Long whenDeleted = lastDelTimeMap.get(rowAndQual);
lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts
: Math.max(ts, whenDeleted));
deletedSomething = true;
}
if (deletedSomething)
region.delete(d);
}
}
region.flush(true);
}
Collections.sort(kvs, CellComparatorImpl.COMPARATOR);
for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions) {
for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK; ++columnBitMask) {
Scan scan = new Scan();
scan.readVersions(maxVersions);
Set<String> qualSet = new TreeSet<>();
{
int columnMaskTmp = columnBitMask;
for (String qual : qualifiers) {
if ((columnMaskTmp & 1) != 0) {
scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
qualSet.add(qual);
}
columnMaskTmp >>= 1;
}
assertEquals(0, columnMaskTmp);
}
InternalScanner scanner = region.getScanner(scan);
List<Cell> results = new ArrayList<>();
int kvPos = 0;
int numResults = 0;
String queryInfo = "columns queried: " + qualSet + " (columnBitMask="
+ columnBitMask + "), maxVersions=" + maxVersions;
while (scanner.next(results) || results.size() > 0) {
for (Cell kv : results) {
while (kvPos < kvs.size()
&& !matchesQuery(kvs.get(kvPos), qualSet, maxVersions,
lastDelTimeMap)) {
++kvPos;
}
String rowQual = getRowQualStr(kv);
String deleteInfo = "";
Long lastDelTS = lastDelTimeMap.get(rowQual);
if (lastDelTS != null) {
deleteInfo = "; last timestamp when row/column " + rowQual
+ " was deleted: " + lastDelTS;
}
assertTrue("Scanner returned additional key/value: " + kv + ", "
+ queryInfo + deleteInfo + ";", kvPos < kvs.size());
assertTrue("Scanner returned wrong key/value; " + queryInfo + deleteInfo + ";",
PrivateCellUtil.equalsIgnoreMvccVersion(kvs.get(kvPos), (kv)));
++kvPos;
++numResults;
}
results.clear();
}
for (; kvPos < kvs.size(); ++kvPos) {
KeyValue remainingKV = kvs.get(kvPos);
assertFalse("Matching column not returned by scanner: "
+ remainingKV + ", " + queryInfo + ", results returned: "
+ numResults, matchesQuery(remainingKV, qualSet, maxVersions,
lastDelTimeMap));
}
}
}
assertTrue("This test is supposed to delete at least some row/column " +
"pairs", lastDelTimeMap.size() > 0);
LOG.info("Number of row/col pairs deleted at least once: " +
lastDelTimeMap.size());
HBaseTestingUtility.closeRegionAndWAL(region);
}
private static String getRowQualStr(Cell kv) {
String rowStr = Bytes.toString(CellUtil.cloneRow(kv));
String qualStr = Bytes.toString(CellUtil.cloneQualifier(kv));
return rowStr + "_" + qualStr;
}
private static boolean matchesQuery(KeyValue kv, Set<String> qualSet,
int maxVersions, Map<String, Long> lastDelTimeMap) {
Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
long ts = kv.getTimestamp();
return qualSet.contains(qualStr(kv))
&& ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions]
&& (lastDelTS == null || ts > lastDelTS);
}
private static String qualStr(KeyValue kv) {
return Bytes.toString(kv.getQualifierArray(), kv.getQualifierOffset(),
kv.getQualifierLength());
}
static String createValue(String row, String qual, long ts) {
return "value_for_" + row + "_" + qual + "_" + ts;
}
private static List<String> sequentialStrings(String prefix, int n) {
List<String> lst = new ArrayList<>();
for (int i = 0; i < n; ++i) {
StringBuilder sb = new StringBuilder();
sb.append(prefix + i);
// Make column length depend on i.
int iBitShifted = i;
while (iBitShifted != 0) {
sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
iBitShifted >>= 1;
}
lst.add(sb.toString());
}
return lst;
}
}