blob: 760ed3a076895ebf0aedbe83290e3d7b6e648cec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.util;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.storage.hbase.HBaseConnection;
import com.google.common.collect.Lists;
public class GridTableHBaseBenchmark {
private static final String TEST_TABLE = "GridTableTest";
private static final byte[] CF = "F".getBytes();
private static final byte[] QN = "C".getBytes();
private static final int N_ROWS = 10000;
private static final int CELL_SIZE = 128 * 1024; // 128 KB
private static final double DFT_HIT_RATIO = 0.3;
private static final double DFT_INDEX_RATIO = 0.1;
private static final int ROUND = 3;
public static void main(String[] args) throws IOException {
double hitRatio = DFT_HIT_RATIO;
try {
hitRatio = Double.parseDouble(args[0]);
} catch (Exception e) {
// nevermind
}
double indexRatio = DFT_INDEX_RATIO;
try {
indexRatio = Double.parseDouble(args[1]);
} catch (Exception e) {
// nevermind
}
testGridTable(hitRatio, indexRatio);
}
public static void testGridTable(double hitRatio, double indexRatio) throws IOException {
System.out.println("Testing grid table scanning, hit ratio " + hitRatio + ", index ratio " + indexRatio);
String hbaseUrl = "hbase"; // use hbase-site.xml on classpath
Connection conn = HBaseConnection.get(hbaseUrl);
createHTableIfNeeded(conn, TEST_TABLE);
prepareData(conn);
Hits hits = new Hits(N_ROWS, hitRatio, indexRatio);
for (int i = 0; i < ROUND; i++) {
System.out.println("==================================== ROUND " + (i + 1) + " ========================================");
testRowScanWithIndex(conn, hits.getHitsForRowScanWithIndex());
testRowScanNoIndexFullScan(conn, hits.getHitsForRowScanNoIndex());
testRowScanNoIndexSkipScan(conn, hits.getHitsForRowScanNoIndex());
testColumnScan(conn, hits.getHitsForColumnScan());
}
}
private static void testColumnScan(Connection conn, List<Pair<Integer, Integer>> colScans) throws IOException {
Stats stats = new Stats("COLUMN_SCAN");
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
int nLogicCols = colScans.size();
int nLogicRows = colScans.get(0).getSecond() - colScans.get(0).getFirst();
Scan[] scans = new Scan[nLogicCols];
ResultScanner[] scanners = new ResultScanner[nLogicCols];
for (int i = 0; i < nLogicCols; i++) {
scans[i] = new Scan();
scans[i].addFamily(CF);
scanners[i] = table.getScanner(scans[i]);
}
for (int i = 0; i < nLogicRows; i++) {
for (int c = 0; c < nLogicCols; c++) {
Result r = scanners[c].next();
stats.consume(r);
}
dot(i, nLogicRows);
}
stats.markEnd();
} finally {
IOUtils.closeQuietly(table);
}
}
private static void testRowScanNoIndexFullScan(Connection conn, boolean[] hits) throws IOException {
fullScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_FULL"));
}
private static void testRowScanNoIndexSkipScan(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_NO_IDX_SKIP"));
}
private static void testRowScanWithIndex(Connection conn, boolean[] hits) throws IOException {
jumpScan(conn, hits, new Stats("ROW_SCAN_IDX"));
}
private static void fullScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
Scan scan = new Scan();
scan.addFamily(CF);
ResultScanner scanner = table.getScanner(scan);
int i = 0;
for (Result r : scanner) {
if (hits[i])
stats.consume(r);
dot(i, N_ROWS);
i++;
}
stats.markEnd();
} finally {
IOUtils.closeQuietly(table);
}
}
private static void jumpScan(Connection conn, boolean[] hits, Stats stats) throws IOException {
final int jumpThreshold = 6; // compensate for Scan() overhead, totally by experience
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
stats.markStart();
int i = 0;
while (i < N_ROWS) {
int start, end;
for (start = i; start < N_ROWS; start++) {
if (hits[start])
break;
}
for (end = start + 1; end < N_ROWS; end++) {
boolean isEnd = true;
for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++)
if (hits[end + j])
isEnd = false;
if (isEnd)
break;
}
if (start < N_ROWS) {
Scan scan = new Scan();
scan.setStartRow(Bytes.toBytes(start));
scan.setStopRow(Bytes.toBytes(end));
scan.addFamily(CF);
ResultScanner scanner = table.getScanner(scan);
i = start;
for (Result r : scanner) {
stats.consume(r);
dot(i, N_ROWS);
i++;
}
}
i = end;
}
stats.markEnd();
} finally {
IOUtils.closeQuietly(table);
}
}
private static void prepareData(Connection conn) throws IOException {
Table table = conn.getTable(TableName.valueOf(TEST_TABLE));
try {
// check how many rows existing
int nRows = 0;
Scan scan = new Scan();
scan.setFilter(new KeyOnlyFilter());
ResultScanner scanner = table.getScanner(scan);
for (Result r : scanner) {
r.getRow(); // nothing to do
nRows++;
}
if (nRows > 0) {
System.out.println(nRows + " existing rows");
if (nRows != N_ROWS)
throw new IOException("Expect " + N_ROWS + " rows but it is not");
return;
}
// insert rows into empty table
System.out.println("Writing " + N_ROWS + " rows to " + TEST_TABLE);
long nBytes = 0;
for (int i = 0; i < N_ROWS; i++) {
byte[] rowkey = Bytes.toBytes(i);
Put put = new Put(rowkey);
byte[] cell = randomBytes();
put.add(CF, QN, cell);
table.put(put);
nBytes += cell.length;
dot(i, N_ROWS);
}
System.out.println();
System.out.println("Written " + N_ROWS + " rows, " + nBytes + " bytes");
} finally {
IOUtils.closeQuietly(table);
}
}
private static void dot(int i, int nRows) {
if (i % (nRows / 100) == 0)
System.out.print(".");
}
private static byte[] randomBytes() {
byte[] bytes = new byte[CELL_SIZE];
Random rand = new Random();
rand.nextBytes(bytes);
return bytes;
}
private static void createHTableIfNeeded(Connection conn, String tableName) throws IOException {
Admin hbase = conn.getAdmin();
try {
boolean tableExist = false;
try {
hbase.getTableDescriptor(TableName.valueOf(tableName));
tableExist = true;
} catch (TableNotFoundException e) {
}
if (tableExist) {
System.out.println("HTable '" + tableName + "' already exists");
return;
}
System.out.println("Creating HTable '" + tableName + "'");
HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor fd = new HColumnDescriptor(CF);
fd.setBlocksize(CELL_SIZE);
desc.addFamily(fd);
hbase.createTable(desc);
System.out.println("HTable '" + tableName + "' created");
} finally {
hbase.close();
}
}
static class Hits {
boolean[] hitsForRowScanWithIndex;
boolean[] hitsForRowScanNoIndex;
List<Pair<Integer, Integer>> hitsForColumnScan;
public Hits(int nRows, double hitRatio, double indexRatio) {
Random rand = new Random();
hitsForRowScanWithIndex = new boolean[nRows];
hitsForRowScanNoIndex = new boolean[nRows];
// for row scan
int blockSize = (int) (1.0 / indexRatio);
int nBlocks = nRows / blockSize;
for (int i = 0; i < nBlocks; i++) {
if (rand.nextDouble() < hitRatio) {
for (int j = 0; j < blockSize; j++) {
hitsForRowScanNoIndex[i * blockSize + j] = true;
hitsForRowScanWithIndex[i * blockSize + j] = true;
}
} else {
// case of not hit
hitsForRowScanNoIndex[i * blockSize] = true;
}
}
hitsForColumnScan = Lists.newArrayList();
// for column scan
int nColumns = 20;
int logicRows = nRows / nColumns;
for (int i = 0; i < nColumns; i++) {
if (rand.nextDouble() < hitRatio) {
hitsForColumnScan.add(Pair.newPair(i * logicRows, (i + 1) * logicRows));
}
}
}
public boolean[] getHitsForRowScanWithIndex() {
return hitsForRowScanWithIndex;
}
public boolean[] getHitsForRowScanNoIndex() {
return hitsForRowScanNoIndex;
}
public List<Pair<Integer, Integer>> getHitsForColumnScan() {
return hitsForColumnScan;
}
}
static class Stats {
String name;
long startTime;
long endTime;
long rowsRead;
long bytesRead;
public Stats(String name) {
this.name = name;
}
public void consume(Result r) {
consume(r, Integer.MAX_VALUE);
}
private void consume(Result r, int nBytesToConsume) {
Cell cell = r.getColumnLatestCell(CF, QN);
byte mix = 0;
byte[] valueArray = cell.getValueArray();
int n = Math.min(nBytesToConsume, cell.getValueLength());
for (int i = 0; i < n; i++) {
mix ^= valueArray[i];
bytesRead++;
}
discard(mix);
rowsRead++;
}
private void discard(byte n) {
// do nothing
}
public void markStart() {
System.out.println(name + " starts");
startTime = System.currentTimeMillis();
}
public void markEnd() {
endTime = System.currentTimeMillis();
System.out.println();
System.out.println(name + " ends, " + (endTime - startTime) + " ms, " + rowsRead + " rows read, " + bytesRead + " bytes read");
}
}
}