| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.hadoop.hbase.Cell; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext; |
| import org.apache.kylin.measure.MeasureAggregator; |
| import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; |
| import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior; |
| import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter; |
| import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector; |
| import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType; |
| |
| /** |
| * @author yangli9 |
| * |
| */ |
| public class AggregationScanner implements RegionScanner { |
| |
| private RegionScanner outerScanner; |
| private CoprocessorBehavior behavior; |
| |
| public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, CoprocessorBehavior behavior) throws IOException { |
| |
| AggregateRegionObserver.LOG.info("Kylin Coprocessor start"); |
| |
| this.behavior = behavior; |
| |
| ObserverAggregationCache aggCache; |
| Stats stats = new Stats(); |
| |
| aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats); |
| stats.countOutputRow(aggCache.getSize()); |
| this.outerScanner = aggCache.getScanner(innerScanner); |
| |
| AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats); |
| } |
| |
| @SuppressWarnings("rawtypes") |
| ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException { |
| |
| ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators); |
| |
| ObserverTuple tuple = new ObserverTuple(type); |
| boolean hasMore = true; |
| List<Cell> results = new ArrayList<Cell>(); |
| byte meaninglessByte = 0; |
| |
| while (hasMore) { |
| results.clear(); |
| hasMore = innerScanner.nextRaw(results); |
| if (results.isEmpty()) |
| continue; |
| |
| if (stats != null) |
| stats.countInputRow(results); |
| |
| Cell cell = results.get(0); |
| tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); |
| |
| if (behavior == CoprocessorBehavior.SCAN) { |
| //touch every byte of the cell so that the cost of scanning will be trully reflected |
| int endIndex = cell.getRowOffset() + cell.getRowLength(); |
| for (int i = cell.getRowOffset(); i < endIndex; ++i) { |
| meaninglessByte += cell.getRowArray()[i]; |
| } |
| } else { |
| if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal()) { |
| if (filter != null && filter.evaluate(tuple) == false) |
| continue; |
| |
| if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal()) { |
| AggrKey aggKey = projector.getAggrKey(results); |
| MeasureAggregator[] bufs = aggCache.getBuffer(aggKey); |
| aggregators.aggregate(bufs, results); |
| |
| if (behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) { |
| aggCache.checkMemoryUsage(); |
| } |
| } |
| } |
| } |
| } |
| |
| if (behavior == CoprocessorBehavior.SCAN) { |
| System.out.println("meaningless byte is now " + meaninglessByte); |
| } |
| |
| return aggCache; |
| } |
| |
| @Override |
| public boolean next(List<Cell> results) throws IOException { |
| return outerScanner.next(results); |
| } |
| |
| @Override |
| public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { |
| return outerScanner.next(result, scannerContext); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result) throws IOException { |
| return outerScanner.nextRaw(result); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { |
| return outerScanner.nextRaw(result, scannerContext); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| outerScanner.close(); |
| } |
| |
| @Override |
| public HRegionInfo getRegionInfo() { |
| return outerScanner.getRegionInfo(); |
| } |
| |
| @Override |
| public boolean isFilterDone() throws IOException { |
| return outerScanner.isFilterDone(); |
| } |
| |
| @Override |
| public boolean reseek(byte[] row) throws IOException { |
| return outerScanner.reseek(row); |
| } |
| |
| @Override |
| public long getMaxResultSize() { |
| return outerScanner.getMaxResultSize(); |
| } |
| |
| @Override |
| public long getMvccReadPoint() { |
| return outerScanner.getMvccReadPoint(); |
| } |
| |
| @Override |
| public int getBatch() { |
| return outerScanner.getBatch(); |
| } |
| |
| private static class Stats { |
| long inputRows = 0; |
| long inputBytes = 0; |
| long outputRows = 0; |
| |
| // have no outputBytes because that requires actual serialize all the |
| // aggregator buffers |
| |
| public void countInputRow(List<Cell> row) { |
| inputRows++; |
| inputBytes += row.get(0).getRowLength(); |
| for (int i = 0, n = row.size(); i < n; i++) { |
| inputBytes += row.get(i).getValueLength(); |
| } |
| } |
| |
| public void countOutputRow(long rowCount) { |
| outputRows += rowCount; |
| } |
| |
| public String toString() { |
| double percent = (double) outputRows / inputRows * 100; |
| return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100); |
| } |
| } |
| } |