blob: 9b8a3de30408bbb251f23e483c78e940be97fc50 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.ShardingHash;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
/**
* for test use only
*/
public class CubeHBaseScanRPC extends CubeHBaseRPC {
public static final Logger logger = LoggerFactory.getLogger(CubeHBaseScanRPC.class);
static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
private final GTInfo info;
private final Iterator<GTRecord> input;
public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
this.info = info;
this.input = input;
}
@Override
public Iterator<GTRecord> iterator() {
return new Iterator<GTRecord>() {
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public GTRecord next() {
GTRecord x = input.next();
return new GTRecord(info, x.getInternal());
}
@Override
public void remove() {
}
};
}
}
public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, final GTInfo fullGTInfo) {
super(cubeSeg, cuboid, fullGTInfo);
MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
@Override
public DimensionEncoding getDimEnc(TblColRef col) {
return fullGTInfo.getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
}
});
}
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
final IGTScanner scanner = getGTScannerInternal(scanRequest);
return new IGTScanner() {
@Override
public GTInfo getInfo() {
return scanner.getInfo();
}
@Override
public int getScannedRowCount() {
int sum = 0;
sum += scanner.getScannedRowCount();
return sum;
}
@Override
public void close() throws IOException {
scanner.close();
}
@Override
public Iterator<GTRecord> iterator() {
return scanner.iterator();
}
};
}
//for non-sharding cases it will only return one byte[] with not shard at beginning
private List<byte[]> getRowKeysDifferentShards(byte[] halfCookedKey) {
final short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
if (!cubeSeg.isEnableSharding()) {
return Lists.newArrayList(halfCookedKey);//not shard to append at head, so it is already well cooked
} else {
List<byte[]> ret = Lists.newArrayList();
for (short i = 0; i < cuboidShardNum; ++i) {
short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
byte[] cookedKey = Arrays.copyOf(halfCookedKey, halfCookedKey.length);
BytesUtil.writeShort(shard, cookedKey, 0, RowConstants.ROWKEY_SHARDID_LEN);
ret.add(cookedKey);
}
return ret;
}
}
private List<RawScan> spawnRawScansForAllShards(RawScan rawScan) {
List<RawScan> ret = Lists.newArrayList();
List<byte[]> startKeys = getRowKeysDifferentShards(rawScan.startKey);
List<byte[]> endKeys = getRowKeysDifferentShards(rawScan.endKey);
for (int i = 0; i < startKeys.size(); i++) {
RawScan temp = new RawScan(rawScan);
temp.startKey = startKeys.get(i);
temp.endKey = endKeys.get(i);
ret.add(temp);
}
return ret;
}
private IGTScanner getGTScannerInternal(final GTScanRequest scanRequest) throws IOException {
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
Connection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final Table hbaseTable = hbaseConn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier()));
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
final List<ResultScanner> scanners = Lists.newArrayList();
final List<Iterator<Result>> resultIterators = Lists.newArrayList();
for (RawScan rawScan : rawScans) {
for (RawScan rawScanWithShard : spawnRawScansForAllShards(rawScan)) {
logScan(rawScanWithShard, cubeSeg.getStorageLocationIdentifier());
Scan hbaseScan = buildScan(rawScanWithShard);
final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
final Iterator<Result> iterator = scanner.iterator();
scanners.add(scanner);
resultIterators.add(iterator);
}
}
final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
CellListIterator cellListIterator = new CellListIterator() {
@Override
public void close() throws IOException {
for (ResultScanner scanner : scanners) {
scanner.close();
}
hbaseTable.close();
}
@Override
public boolean hasNext() {
return allResultsIterator.hasNext();
}
@Override
public List<Cell> next() {
return allResultsIterator.next().listCells();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT, cubeSeg.getRowKeyPreambleSize());
IGTScanner rawScanner = store.scan(scanRequest);
final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
return new IGTScanner() {
@Override
public GTInfo getInfo() {
return fullGTInfo;
}
@Override
public int getScannedRowCount() {
return decorateScanner.getScannedRowCount();
}
@Override
public void close() throws IOException {
decorateScanner.close();
}
@Override
public Iterator<GTRecord> iterator() {
return trimmedInfoGTRecordAdapter.iterator();
}
};
}
}