blob: 81f205ee2a237f2d622facfef1f40f501d3073e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.gridtable.IGTWriter;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
public class SimpleHBaseStore implements IGTStore {
static final String CF = "F";
static final byte[] CF_B = Bytes.toBytes(CF);
static final String COL = "C";
static final byte[] COL_B = Bytes.toBytes(COL);
static final int ID_LEN = RowConstants.ROWKEY_CUBOIDID_LEN;
final GTInfo info;
final TableName htableName;
public SimpleHBaseStore(GTInfo info, TableName htable) {
this.info = info;
this.htableName = htable;
}
@Override
public GTInfo getInfo() {
return info;
}
public void cleanup() throws IOException {
CubeHTableUtil.deleteHTable(htableName);
}
@Override
public IGTWriter rebuild() throws IOException {
CubeHTableUtil.createBenchmarkHTable(htableName, CF);
return new Writer();
}
@Override
public IGTWriter append() throws IOException {
return new Writer();
}
@Override
public IGTScanner scan(GTScanRequest scanRequest) throws IOException {
return new Reader();
}
private class Writer implements IGTWriter {
final BufferedMutator table;
final ByteBuffer rowkey = ByteBuffer.allocate(50);
final ByteBuffer value = ByteBuffer.allocate(50);
Writer() throws IOException {
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getBufferedMutator(htableName);
}
@Override
public void write(GTRecord rec) throws IOException {
assert info.getColumnBlockCount() == 2;
rowkey.clear();
for (int i = 0; i < ID_LEN; i++) {
rowkey.put((byte) 0);
}
rec.exportColumnBlock(0, rowkey);
rowkey.flip();
value.clear();
rec.exportColumnBlock(1, value);
value.flip();
Put put = new Put(rowkey);
put.addImmutable(CF_B, ByteBuffer.wrap(COL_B), HConstants.LATEST_TIMESTAMP, value);
table.mutate(put);
}
@Override
public void close() throws IOException {
table.flush();
table.close();
}
}
class Reader implements IGTScanner {
final Table table;
final ResultScanner scanner;
int count = 0;
Reader() throws IOException {
Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
table = conn.getTable(htableName);
Scan scan = new Scan();
scan.addFamily(CF_B);
scan.setCaching(1024);
scan.setCacheBlocks(true);
scanner = table.getScanner(scan);
}
public ResultScanner getHBaseScanner() {
return scanner;
}
@Override
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
GTRecord next = null;
GTRecord rec = new GTRecord(info);
@Override
public boolean hasNext() {
if (next != null)
return true;
try {
Result r = scanner.next();
if (r != null) {
loadRecord(r);
next = rec;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return next != null;
}
private void loadRecord(Result r) {
Cell[] cells = r.rawCells();
Cell cell = cells[0];
if (Bytes.compareTo(CF_B, 0, CF_B.length, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) != 0 //
|| Bytes.compareTo(COL_B, 0, COL_B.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) != 0)
throw new IllegalStateException();
rec.loadCellBlock(0, ByteBuffer.wrap(cell.getRowArray(), cell.getRowOffset() + ID_LEN, cell.getRowLength() - ID_LEN));
rec.loadCellBlock(1, ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
}
@Override
public GTRecord next() {
if (hasNext() == false)
throw new NoSuchElementException();
count++;
next = null;
return rec;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public void close() throws IOException {
scanner.close();
table.close();
}
@Override
public GTInfo getInfo() {
return info;
}
@Override
public int getScannedRowCount() {
return count;
}
}
}