blob: 3a67d0c816713b84cad9c703c9c8b55b5da6a14e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v2;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.DataFormatException;
import javax.annotation.Nullable;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.BytesSerializer;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.LoggableCachedThreadPool;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.gridtable.GTInfo;
import org.apache.kylin.gridtable.GTRecord;
import org.apache.kylin.gridtable.GTScanRange;
import org.apache.kylin.gridtable.GTScanRequest;
import org.apache.kylin.gridtable.IGTScanner;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.HBaseZeroCopyByteString;
public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
public static final Logger logger = LoggerFactory.getLogger(CubeHBaseEndpointRPC.class);
private static ExecutorService executorService = new LoggableCachedThreadPool();
static class ExpectedSizeIterator implements Iterator<byte[]> {
BlockingQueue<byte[]> queue;
int expectedSize;
int current = 0;
long timeout;
long timeoutTS;
public ExpectedSizeIterator(int expectedSize) {
this.expectedSize = expectedSize;
this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
this.timeout = HadoopUtil.getCurrentConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
if (BackdoorToggles.getQueryTimeout() != -1) {
this.timeout = BackdoorToggles.getQueryTimeout();
}
this.timeout *= 1.1;//allow for some delay
logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
this.timeoutTS = System.currentTimeMillis() + this.timeout;
}
@Override
public boolean hasNext() {
return (current < expectedSize);
}
@Override
public byte[] next() {
if (current >= expectedSize) {
throw new IllegalStateException("Won't have more data");
}
try {
current++;
long tsRemaining = this.timeoutTS - System.currentTimeMillis();
if (tsRemaining < 0) {
throw new RuntimeException("Timeout visiting cube!");
}
byte[] ret = queue.poll(tsRemaining, TimeUnit.MILLISECONDS);
if (ret == null) {
throw new RuntimeException("Timeout visiting cube!");
} else {
return ret;
}
} catch (InterruptedException e) {
throw new RuntimeException("error when waiting queue", e);
}
}
@Override
public void remove() {
throw new NotImplementedException();
}
public void append(byte[] data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException("error when waiting queue", e);
}
}
public long getTimeout() {
return timeout;
}
}
static class EndpointResultsAsGTScanner implements IGTScanner {
private GTInfo info;
private Iterator<byte[]> blocks;
private ImmutableBitSet columns;
private int totalScannedCount;
public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, int totalScannedCount) {
this.info = info;
this.blocks = blocks;
this.columns = columns;
this.totalScannedCount = totalScannedCount;
}
@Override
public GTInfo getInfo() {
return info;
}
@Override
public int getScannedRowCount() {
return totalScannedCount;
}
@Override
public void close() throws IOException {
//do nothing
}
@Override
public Iterator<GTRecord> iterator() {
return Iterators.concat(Iterators.transform(blocks, new Function<byte[], Iterator<GTRecord>>() {
@Nullable
@Override
public Iterator<GTRecord> apply(@Nullable final byte[] input) {
return new Iterator<GTRecord>() {
private ByteBuffer inputBuffer = null;
private GTRecord oneRecord = null;
@Override
public boolean hasNext() {
if (inputBuffer == null) {
inputBuffer = ByteBuffer.wrap(input);
oneRecord = new GTRecord(info);
}
return inputBuffer.position() < inputBuffer.limit();
}
@Override
public GTRecord next() {
oneRecord.loadColumns(columns, inputBuffer);
return oneRecord;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
}));
}
}
public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
super(cubeSeg, cuboid, fullGTInfo);
}
private byte[] getByteArrayForShort(short v) {
byte[] split = new byte[Bytes.SIZEOF_SHORT];
BytesUtil.writeUnsigned(v, split, 0, Bytes.SIZEOF_SHORT);
return split;
}
@SuppressWarnings("unchecked")
private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) {
if (shardNum == 0) {
return Lists.newArrayList();
}
if (shardNum == totalShards) {
//all shards
return Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (shardNum - 1))));
} else if (baseShard + shardNum <= totalShards) {
//endpoint end key is inclusive, so no need to append 0 or anything
return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (baseShard + shardNum - 1))));
} else {
//0,1,2,3,4 wants 4,0
return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard), getByteArrayForShort((short) (totalShards - 1))), //
Pair.newPair(getByteArrayForShort((short) 0), getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
}
}
protected Pair<Short, Short> getShardNumAndBaseShard() {
return Pair.newPair(cubeSeg.getCuboidShardNum(cuboid.getId()), cubeSeg.getCuboidBaseShard(cuboid.getId()));
}
@Override
public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
final String toggle = BackdoorToggles.getCoprocessorBehavior() == null ? CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString() : BackdoorToggles.getCoprocessorBehavior();
logger.debug("New scanner for current segment {} will use {} as endpoint's behavior", cubeSeg, toggle);
Pair<Short, Short> shardNumAndBaseShard = getShardNumAndBaseShard();
short shardNum = shardNumAndBaseShard.getFirst();
short cuboidBaseShard = shardNumAndBaseShard.getSecond();
int totalShards = cubeSeg.getTotalShards();
ByteString scanRequestByteString = null;
ByteString rawScanByteString = null;
// primary key (also the 0th column block) is always selected
final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
// globally shared connection, does not require close
final Connection conn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
for (List<Integer> list : hbaseColumnsToGT) {
hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
}
//TODO: raw scan can be constructed at region side to reduce traffic
List<RawScan> rawScans = preparedHBaseScans(scanRequest.getGTScanRanges(), selectedColBlocks);
int rawScanBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
while (true) {
try {
ByteBuffer rawScanBuffer = ByteBuffer.allocate(rawScanBufferSize);
BytesUtil.writeVInt(rawScans.size(), rawScanBuffer);
for (RawScan rs : rawScans) {
RawScan.serializer.serialize(rs, rawScanBuffer);
}
rawScanBuffer.flip();
rawScanByteString = HBaseZeroCopyByteString.wrap(rawScanBuffer.array(), rawScanBuffer.position(), rawScanBuffer.limit());
break;
} catch (BufferOverflowException boe) {
logger.info("Buffer size {} cannot hold the raw scans, resizing to 4 times", rawScanBufferSize);
rawScanBufferSize *= 4;
}
}
scanRequest.setGTScanRanges(Lists.<GTScanRange> newArrayList());//since raw scans are sent to coprocessor, we don't need to duplicate sending it
int scanRequestBufferSize = BytesSerializer.SERIALIZE_BUFFER_SIZE;
while (true) {
try {
ByteBuffer buffer = ByteBuffer.allocate(scanRequestBufferSize);
GTScanRequest.serializer.serialize(scanRequest, buffer);
buffer.flip();
scanRequestByteString = HBaseZeroCopyByteString.wrap(buffer.array(), buffer.position(), buffer.limit());
break;
} catch (BufferOverflowException boe) {
logger.info("Buffer size {} cannot hold the scan request, resizing to 4 times", scanRequestBufferSize);
scanRequestBufferSize *= 4;
}
}
logger.debug("Serialized scanRequestBytes {} bytes, rawScanBytesString {} bytes", scanRequestByteString.size(), rawScanByteString.size());
logger.info("The scan {} for segment {} is as below with {} separate raw scans, shard part of start/end key is set to 0", Integer.toHexString(System.identityHashCode(scanRequest)), cubeSeg, rawScans.size());
for (RawScan rs : rawScans) {
logScan(rs, cubeSeg.getStorageLocationIdentifier());
}
logger.debug("Submitting rpc to {} shards starting from shard {}, scan range count {}", shardNum, cuboidBaseShard, rawScans.size());
final AtomicInteger totalScannedCount = new AtomicInteger(0);
final ExpectedSizeIterator epResultItr = new ExpectedSizeIterator(shardNum);
final CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
builder.setGtScanRequest(scanRequestByteString).setHbaseRawScan(rawScanByteString);
for (IntList intList : hbaseColumnsToGTIntList) {
builder.addHbaseColumnsToGT(intList);
}
builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
builder.setBehavior(toggle);
builder.setStartTime(System.currentTimeMillis());
builder.setTimeout(epResultItr.getTimeout());
for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
executorService.submit(new Runnable() {
@Override
public void run() {
String logHeader = "<sub-thread for GTScanRequest " + Integer.toHexString(System.identityHashCode(scanRequest)) + "> ";
Map<byte[], CubeVisitProtos.CubeVisitResponse> results;
try {
results = getResults(builder.build(), conn.getTable(TableName.valueOf(cubeSeg.getStorageLocationIdentifier())), epRange.getFirst(), epRange.getSecond());
} catch (Throwable throwable) {
throw new RuntimeException(logHeader + "Error when visiting cubes by endpoint", throwable);
}
boolean abnormalFinish = false;
for (Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result : results.entrySet()) {
totalScannedCount.addAndGet(result.getValue().getStats().getScannedRowCount());
logger.info(logHeader + getStatsString(result));
if (result.getValue().getStats().getNormalComplete() != 1) {
abnormalFinish = true;
} else {
try {
epResultItr.append(CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(result.getValue().getCompressedRows())));
} catch (IOException | DataFormatException e) {
throw new RuntimeException(logHeader + "Error when decompressing", e);
}
}
}
if (abnormalFinish) {
throw new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
}
}
});
}
return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
}
private String getStatsString(Map.Entry<byte[], CubeVisitProtos.CubeVisitResponse> result) {
StringBuilder sb = new StringBuilder();
Stats stats = result.getValue().getStats();
sb.append("Endpoint RPC returned from HTable ").append(cubeSeg.getStorageLocationIdentifier()).append(" Shard ").append(BytesUtil.toHex(result.getKey())).append(" on host: ").append(stats.getHostname()).append(".");
sb.append("Total scanned row: ").append(stats.getScannedRowCount()).append(". ");
sb.append("Total filtered/aggred row: ").append(stats.getAggregatedRowCount()).append(". ");
sb.append("Time elapsed in EP: ").append(stats.getServiceEndTime() - stats.getServiceStartTime()).append("(ms). ");
sb.append("Server CPU usage: ").append(stats.getSystemCpuLoad()).append(", server physical mem left: ").append(stats.getFreePhysicalMemorySize()).append(", server swap mem left:").append(stats.getFreeSwapSpaceSize()).append(".");
sb.append("Etc message: ").append(stats.getEtcMsg()).append(".");
sb.append("Normal Complete: ").append(stats.getNormalComplete() == 1).append(".");
return sb.toString();
}
private Map<byte[], CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, Table table, byte[] startKey, byte[] endKey) throws Throwable {
Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<CubeVisitProtos.CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
rowsService.visitCube(controller, request, rpcCallback);
CubeVisitProtos.CubeVisitResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
});
return results;
}
}