blob: c96d706e7574b6384662223a3522beb41385a15c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.cube.kv.RowConstants;
import org.apache.kylin.dimension.DimensionEncoding;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.gridtable.IGTStore;
import org.apache.kylin.metadata.filter.UDF.MassInTupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.IRealizationConstants;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.CellListIterator;
import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
import org.apache.kylin.storage.hbase.cube.v2.RawScan;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
import org.apache.kylin.storage.hbase.cube.v2.filter.MassInValueProviderFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.HBaseZeroCopyByteString;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.sun.management.OperatingSystemMXBean;
@SuppressWarnings("unused")
//used in hbase endpoint
public class CubeVisitService extends CubeVisitProtos.CubeVisitService implements Coprocessor, CoprocessorService {
private static final Logger logger = LoggerFactory.getLogger(CubeVisitService.class);
//TODO limit memory footprint
private static final int MEMORY_LIMIT = 500 * 1024 * 1024;
private RegionCoprocessorEnvironment env;
private long serviceStartTime;
static class InnerScannerAsIterator implements CellListIterator {
private RegionScanner regionScanner;
private List<Cell> nextOne = Lists.newArrayList();
private List<Cell> ret = Lists.newArrayList();
private boolean hasMore;
public InnerScannerAsIterator(RegionScanner regionScanner) {
this.regionScanner = regionScanner;
try {
hasMore = regionScanner.nextRaw(nextOne);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public boolean hasNext() {
return !nextOne.isEmpty();
}
@Override
public List<Cell> next() {
if (nextOne.size() < 1) {
throw new IllegalStateException();
}
ret.clear();
ret.addAll(nextOne);
nextOne.clear();
try {
if (hasMore) {
hasMore = regionScanner.nextRaw(nextOne);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return ret;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
//does not need to close as regionScanner will be closed in finally block
}
}
private void updateRawScanByCurrentRegion(RawScan rawScan, HRegion region, int shardLength) {
if (shardLength == 0) {
return;
}
byte[] regionStartKey = ArrayUtils.isEmpty(region.getRegionInfo().getStartKey()) ? new byte[shardLength] : region.getRegionInfo().getStartKey();
Bytes.putBytes(rawScan.startKey, 0, regionStartKey, 0, shardLength);
Bytes.putBytes(rawScan.endKey, 0, regionStartKey, 0, shardLength);
}
private List<RawScan> deserializeRawScans(ByteBuffer in) {
int rawScanCount = BytesUtil.readVInt(in);
List<RawScan> ret = Lists.newArrayList();
for (int i = 0; i < rawScanCount; i++) {
RawScan temp = RawScan.serializer.deserialize(in);
ret.add(temp);
}
return ret;
}
private void appendProfileInfo(StringBuilder sb, String info) {
if (info != null) {
sb.append(info);
}
sb.append("@" + (System.currentTimeMillis() - this.serviceStartTime));
sb.append(",");
}
@Override
public void visitCube(final RpcController controller, CubeVisitProtos.CubeVisitRequest request, RpcCallback<CubeVisitProtos.CubeVisitResponse> done) {
List<RegionScanner> regionScanners = Lists.newArrayList();
HRegion region = null;
StringBuilder sb = new StringBuilder();
byte[] allRows;
String debugGitTag = "";
try {
this.serviceStartTime = System.currentTimeMillis();
region = (HRegion)env.getRegion();
region.startRegionOperation();
debugGitTag = region.getTableDesc().getValue(IRealizationConstants.HTableGitTag);
final GTScanRequest scanReq = GTScanRequest.serializer.deserialize(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest())));
List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
for (IntList intList : request.getHbaseColumnsToGTList()) {
hbaseColumnsToGT.add(intList.getIntsList());
}
CoprocessorBehavior behavior = CoprocessorBehavior.valueOf(request.getBehavior());
final List<RawScan> hbaseRawScans = deserializeRawScans(ByteBuffer.wrap(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan())));
appendProfileInfo(sb, "start latency: " + (this.serviceStartTime - request.getStartTime()));
MassInTupleFilter.VALUE_PROVIDER_FACTORY = new MassInValueProviderFactoryImpl(new MassInValueProviderFactoryImpl.DimEncAware() {
@Override
public DimensionEncoding getDimEnc(TblColRef col) {
return scanReq.getInfo().getCodeSystem().getDimEnc(col.getColumnDesc().getZeroBasedIndex());
}
});
final List<InnerScannerAsIterator> cellListsForeachRawScan = Lists.newArrayList();
for (RawScan hbaseRawScan : hbaseRawScans) {
if (request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN > 0) {
//if has shard, fill region shard to raw scan start/end
updateRawScanByCurrentRegion(hbaseRawScan, region, request.getRowkeyPreambleSize() - RowConstants.ROWKEY_CUBOIDID_LEN);
}
Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
RegionScanner innerScanner = region.getScanner(scan);
regionScanners.add(innerScanner);
InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
cellListsForeachRawScan.add(cellListIterator);
}
final Iterator<List<Cell>> allCellLists = Iterators.concat(cellListsForeachRawScan.iterator());
if (behavior.ordinal() < CoprocessorBehavior.SCAN.ordinal()) {
//this is only for CoprocessorBehavior.RAW_SCAN case to profile hbase scan speed
List<Cell> temp = Lists.newArrayList();
int counter = 0;
for (RegionScanner innerScanner : regionScanners) {
while (innerScanner.nextRaw(temp)) {
counter++;
}
}
appendProfileInfo(sb, "scanned " + counter);
}
if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
scanReq.setAggrCacheGB(0); // disable mem check if so told
}
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
final long startTime = this.serviceStartTime;
final long timeout = request.getTimeout();
final CellListIterator cellListIterator = new CellListIterator() {
int counter = 0;
@Override
public void close() throws IOException {
for (CellListIterator closeable : cellListsForeachRawScan) {
closeable.close();
}
}
@Override
public boolean hasNext() {
if (counter % 1000 == 1) {
if (System.currentTimeMillis() - startTime > timeout) {
scanNormalComplete.setValue(false);
logger.error("scanner aborted because timeout");
return false;
}
}
if (counter % 100000 == 1) {
logger.info("Scanned " + counter + " rows.");
}
counter++;
return allCellLists.hasNext();
}
@Override
public List<Cell> next() {
return allCellLists.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScans.get(0).hbaseColumns, hbaseColumnsToGT, request.getRowkeyPreambleSize());
IGTScanner rawScanner = store.scan(scanReq);
IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
int finalRowCount = 0;
for (GTRecord oneRecord : finalScanner) {
if (!scanNormalComplete.booleanValue()) {
logger.error("aggregate iterator aborted because input iterator aborts");
break;
}
if (finalRowCount % 1000 == 1) {
if (System.currentTimeMillis() - startTime > timeout) {
logger.error("aggregate iterator aborted because timeout");
break;
}
}
buffer.clear();
oneRecord.exportColumns(scanReq.getColumns(), buffer);
buffer.flip();
outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
finalRowCount++;
}
finalScanner.close();
appendProfileInfo(sb, "agg done");
//outputStream.close() is not necessary
byte[] compressedAllRows;
if (scanNormalComplete.booleanValue()) {
allRows = outputStream.toByteArray();
} else {
allRows = new byte[0];
}
compressedAllRows = CompressionUtils.compress(allRows);
appendProfileInfo(sb, "compress done");
OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
double freePhysicalMemorySize = operatingSystemMXBean.getFreePhysicalMemorySize();
double freeSwapSpaceSize = operatingSystemMXBean.getFreeSwapSpaceSize();
appendProfileInfo(sb, "server stats done");
sb.append(" debugGitTag:" + debugGitTag);
CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
done.run(responseBuilder.//
setCompressedRows(HBaseZeroCopyByteString.wrap(compressedAllRows)).//too many array copies
setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
setScannedRowCount(finalScanner.getScannedRowCount()).//
setServiceStartTime(serviceStartTime).//
setServiceEndTime(System.currentTimeMillis()).//
setSystemCpuLoad(systemCpuLoad).//
setFreePhysicalMemorySize(freePhysicalMemorySize).//
setFreeSwapSpaceSize(freeSwapSpaceSize).//
setHostname(InetAddress.getLocalHost().getHostName()).//
setEtcMsg(sb.toString()).//
setNormalComplete(scanNormalComplete.booleanValue() ? 1 : 0).build())
.//
build());
} catch (IOException ioe) {
logger.error(ioe.toString());
IOException wrapped = new IOException("Error in coprocessor " + debugGitTag, ioe);
ResponseConverter.setControllerException(controller, wrapped);
} finally {
for (RegionScanner innerScanner : regionScanners) {
IOUtils.closeQuietly(innerScanner);
}
if (region != null) {
try {
region.closeRegionOperation();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.env = (RegionCoprocessorEnvironment) env;
} else {
throw new CoprocessorException("Must be loaded on a table region!");
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
}
@Override
public Service getService() {
return this;
}
}