blob: 8da8e5986cf565753e6e088c249afb1e85d64f69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.kylin.common.util.CompressionUtils;
import org.apache.kylin.common.util.DateFormat;
import org.apache.kylin.common.util.RangeUtil;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.invertedindex.index.TableRecord;
import org.apache.kylin.invertedindex.index.TableRecordInfo;
import org.apache.kylin.measure.hllc.HLLCMeasureType;
import org.apache.kylin.metadata.filter.ConstantTupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.tuple.ITuple;
import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.cache.TsConditionExtractor;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.generated.IIProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
import com.google.protobuf.HBaseZeroCopyByteString;
/**
*/
public class EndpointTupleIterator implements ITupleIterator {
private final static Logger logger = LoggerFactory.getLogger(EndpointTupleIterator.class);
private final IISegment seg;
private final String factTableName;
private final List<TblColRef> columns;
private final TupleInfo tupleInfo;
private final TableRecordInfo tableRecordInfo;
private final EndpointTupleConverter tupleConverter;
private final CoprocessorRowType pushedDownRowType;
private final CoprocessorFilter pushedDownFilter;
private final CoprocessorProjector pushedDownProjector;
private final EndpointAggregators pushedDownAggregators;
private final Range<Long> tsRange;//timestamp column condition's interval
private Iterator<List<IIProtos.IIResponseInternal.IIRow>> regionResponsesIterator = null;
private ITupleIterator tupleIterator = null;
private Table table = null;
private TblColRef partitionCol;
private long lastDataTime = -1;
private int rowsInAllMetric = 0;
public EndpointTupleIterator(IISegment segment, TupleFilter rootFilter, Collection<TblColRef> groupBy, List<FunctionDesc> measures, StorageContext context, Connection conn, TupleInfo returnTupleInfo) throws Throwable {
String tableName = segment.getStorageLocationIdentifier();
table = conn.getTable(TableName.valueOf(tableName));
factTableName = segment.getIIDesc().getFactTableName();
if (rootFilter == null) {
rootFilter = ConstantTupleFilter.TRUE;
}
if (groupBy == null) {
groupBy = Sets.newHashSet();
}
if (measures == null) {
measures = Lists.newArrayList();
}
//this method will change measures
rewriteMeasureParameters(measures, segment.getColumns());
this.seg = segment;
this.columns = segment.getColumns();
this.tupleInfo = returnTupleInfo;
this.tupleConverter = new EndpointTupleConverter(columns, measures, returnTupleInfo);
this.tableRecordInfo = new TableRecordInfo(this.seg);
this.pushedDownRowType = CoprocessorRowType.fromTableRecordInfo(tableRecordInfo, this.columns);
this.pushedDownFilter = CoprocessorFilter.fromFilter(new ClearTextDictionary(this.tableRecordInfo), rootFilter, FilterDecorator.FilterConstantsTreatment.AS_IT_IS);
for (TblColRef column : this.pushedDownFilter.getInevaluableColumns()) {
groupBy.add(column);
}
this.pushedDownProjector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, groupBy);
this.pushedDownAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, measures);
int tsCol = this.tableRecordInfo.getTimestampColumn();
this.partitionCol = this.columns.get(tsCol);
this.tsRange = TsConditionExtractor.extractTsCondition(this.partitionCol, rootFilter);
if (this.tsRange == null) {
logger.info("TsRange conflict for endpoint, return empty directly");
this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
} else {
logger.info("The tsRange being pushed is " + RangeUtil.formatTsRange(tsRange));
}
IIProtos.IIRequest endpointRequest = prepareRequest();
Collection<IIProtos.IIResponse> compressedShardResults = getResults(endpointRequest, table);
//decompress
Collection<IIProtos.IIResponseInternal> shardResults = new ArrayList<>();
for (IIProtos.IIResponse input : compressedShardResults) {
byte[] compressed = HBaseZeroCopyByteString.zeroCopyGetBytes(input.getBlob());
try {
byte[] decompressed = CompressionUtils.decompress(compressed);
shardResults.add(IIProtos.IIResponseInternal.parseFrom(decompressed));
} catch (Exception e) {
throw new RuntimeException("decompress endpoint response error");
}
}
this.lastDataTime = Collections.min(Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, Long>() {
@Nullable
@Override
public Long apply(IIProtos.IIResponseInternal input) {
IIProtos.IIResponseInternal.Stats status = input.getStats();
logger.info("Endpoints all returned, stats from shard {}: start moment:{}, finish moment: {}, elapsed ms: {}, scanned slices: {}, latest slice time is {}",//
new Object[] { String.valueOf(status.getMyShard()),//
DateFormat.formatToTimeStr(status.getServiceStartTime()),//
DateFormat.formatToTimeStr(status.getServiceEndTime()),//
String.valueOf(status.getServiceEndTime() - status.getServiceStartTime()),//
String.valueOf(status.getScannedSlices()), DateFormat.formatToTimeStr(status.getLatestDataTime()) });
return status.getLatestDataTime();
}
}));
this.regionResponsesIterator = Collections2.transform(shardResults, new Function<IIProtos.IIResponseInternal, List<IIProtos.IIResponseInternal.IIRow>>() {
@Nullable
@Override
public List<IIProtos.IIResponseInternal.IIRow> apply(@Nullable IIProtos.IIResponseInternal input) {
return input.getRowsList();
}
}).iterator();
if (this.regionResponsesIterator.hasNext()) {
this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
} else {
this.tupleIterator = ITupleIterator.EMPTY_TUPLE_ITERATOR;
}
}
/**
* measure comes from query engine, does not contain enough information
*/
private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
for (FunctionDesc functionDesc : measures) {
if (functionDesc.isCount()) {
functionDesc.setReturnType("bigint");
} else {
boolean updated = false;
for (TblColRef column : columns) {
if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
if (HLLCMeasureType.isCountDistinct(functionDesc)) {
//TODO: default precision might need be configurable
String iiDefaultHLLC = "hllc10";
functionDesc.setReturnType(iiDefaultHLLC);
} else {
functionDesc.setReturnType(column.getColumnDesc().getType().toString());
}
functionDesc.getParameter().setColRefs(ImmutableList.of(column));
updated = true;
break;
}
}
if (!updated) {
throw new RuntimeException("Func " + functionDesc + " is not related to any column in fact table " + factTableName);
}
}
}
}
@Override
public boolean hasNext() {
while (!this.tupleIterator.hasNext()) {
if (this.regionResponsesIterator.hasNext()) {
this.tupleIterator = new SingleRegionTupleIterator(this.regionResponsesIterator.next());
} else {
return false;
}
}
return true;
}
@Override
public ITuple next() {
rowsInAllMetric++;
if (!hasNext()) {
throw new IllegalStateException("No more ITuple in EndpointTupleIterator");
}
ITuple tuple = this.tupleIterator.next();
return tuple;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
IOUtils.closeQuietly(table);
logger.info("Closed after " + rowsInAllMetric + " rows are fetched");
}
/**
* tells storage layer cache what time period of data should not be cached.
* for static storage like cube, it will return null
* for dynamic storage like ii, it will for example exclude the last two minutes for possible data latency
* @return
*/
public Range<Long> getCacheExcludedPeriod() {
Preconditions.checkArgument(lastDataTime != -1, "lastDataTime is not set yet");
return Ranges.greaterThan(lastDataTime);
}
private IIProtos.IIRequest prepareRequest() throws IOException {
IIProtos.IIRequest.Builder builder = IIProtos.IIRequest.newBuilder();
if (this.tsRange != null) {
byte[] tsRangeBytes = SerializationUtils.serialize(this.tsRange);
builder.setTsRange(HBaseZeroCopyByteString.wrap(tsRangeBytes));
}
builder.setType(HBaseZeroCopyByteString.wrap(CoprocessorRowType.serialize(pushedDownRowType))) //
.setFilter(HBaseZeroCopyByteString.wrap(CoprocessorFilter.serialize(pushedDownFilter))) //
.setProjector(HBaseZeroCopyByteString.wrap(CoprocessorProjector.serialize(pushedDownProjector))) //
.setAggregator(HBaseZeroCopyByteString.wrap(EndpointAggregators.serialize(pushedDownAggregators)));
IIProtos.IIRequest request = builder.build();
return request;
}
private Collection<IIProtos.IIResponse> getResults(final IIProtos.IIRequest request, Table table) throws Throwable {
Map<byte[], IIProtos.IIResponse> results = table.coprocessorService(IIProtos.RowsService.class, null, null, new Batch.Call<IIProtos.RowsService, IIProtos.IIResponse>() {
public IIProtos.IIResponse call(IIProtos.RowsService rowsService) throws IOException {
ServerRpcController controller = new ServerRpcController();
BlockingRpcCallback<IIProtos.IIResponse> rpcCallback = new BlockingRpcCallback<>();
rowsService.getRows(controller, request, rpcCallback);
IIProtos.IIResponse response = rpcCallback.get();
if (controller.failedOnException()) {
throw controller.getFailedOn();
}
return response;
}
});
return results.values();
}
/**
* Internal class to handle iterators for a single region's returned rows
*/
class SingleRegionTupleIterator implements ITupleIterator {
private List<IIProtos.IIResponseInternal.IIRow> rows;
private int index = 0;
//not thread safe!
private TableRecord tableRecord;
private List<Object> measureValues;
private Tuple tuple;
public SingleRegionTupleIterator(List<IIProtos.IIResponseInternal.IIRow> rows) {
this.rows = rows;
this.index = 0;
this.tableRecord = tableRecordInfo.createTableRecord();
this.tuple = new Tuple(tupleInfo);
}
@Override
public boolean hasNext() {
return index < rows.size();
}
@Override
public ITuple next() {
if (!hasNext()) {
throw new IllegalStateException("No more Tuple in the SingleRegionTupleIterator");
}
IIProtos.IIResponseInternal.IIRow currentRow = rows.get(index);
byte[] columnsBytes = HBaseZeroCopyByteString.zeroCopyGetBytes(currentRow.getColumns());
this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
if (currentRow.hasMeasures()) {
ByteBuffer buffer = currentRow.getMeasures().asReadOnlyByteBuffer();
this.measureValues = pushedDownAggregators.deserializeMetricValues(buffer);
}
index++;
return tupleConverter.makeTuple(this.tableRecord, this.measureValues, this.tuple);
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
}
}
}