| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| |
| import org.apache.hadoop.hbase.*; |
| import org.apache.hadoop.hbase.KeyValue.Type; |
| import org.apache.hadoop.hbase.regionserver.RegionScanner; |
| import org.apache.hadoop.hbase.regionserver.ScannerContext; |
| import org.apache.kylin.measure.MeasureAggregator; |
| import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey; |
| import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache; |
| |
| /** |
| * @author yangli9 |
| */ |
| @SuppressWarnings("rawtypes") |
| public class ObserverAggregationCache extends AggregationCache { |
| |
| private final ObserverAggregators aggregators; |
| |
| public ObserverAggregationCache(ObserverAggregators aggregators) { |
| this.aggregators = aggregators; |
| } |
| |
| public RegionScanner getScanner(RegionScanner innerScanner) { |
| return new AggregationRegionScanner(innerScanner); |
| } |
| |
| @Override |
| public MeasureAggregator[] createBuffer() { |
| return aggregators.createBuffer(); |
| } |
| |
| private class AggregationRegionScanner implements RegionScanner { |
| |
| private final RegionScanner innerScanner; |
| private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator; |
| |
| public AggregationRegionScanner(RegionScanner innerScanner) { |
| this.innerScanner = innerScanner; |
| this.iterator = aggBufMap.entrySet().iterator(); |
| } |
| |
| @Override |
| public boolean next(List<Cell> results) throws IOException { |
| try { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner next()"); |
| boolean hasMore = false; |
| if (iterator.hasNext()) { |
| Entry<AggrKey, MeasureAggregator[]> entry = iterator.next(); |
| makeCells(entry, results); |
| hasMore = iterator.hasNext(); |
| } |
| // AggregateRegionObserver.LOG.info("Kylin Scanner next() done"); |
| |
| return hasMore; |
| } catch (Exception e) { |
| throw new IOException("Error when calling next", e); |
| } |
| } |
| |
| private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) { |
| byte[][] families = aggregators.getHColFamilies(); |
| byte[][] qualifiers = aggregators.getHColQualifiers(); |
| int nHCols = aggregators.getHColsNum(); |
| |
| AggrKey rowKey = entry.getKey(); |
| MeasureAggregator[] aggBuf = entry.getValue(); |
| ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf); |
| |
| if (nHCols == 0) { |
| Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), // |
| null, 0, 0, // |
| null, 0, 0, // |
| HConstants.LATEST_TIMESTAMP, Type.Put, // |
| null, 0, 0); |
| results.add(keyValue); |
| } else { |
| for (int i = 0; i < nHCols; i++) { |
| Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), // |
| families[i], 0, families[i].length, // |
| qualifiers[i], 0, qualifiers[i].length, // |
| HConstants.LATEST_TIMESTAMP, Type.Put, // |
| rowValues[i].array(), 0, rowValues[i].position()); |
| results.add(keyValue); |
| } |
| } |
| } |
| |
| @Override |
| public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { |
| return next(result); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result) throws IOException { |
| return next(result); |
| } |
| |
| @Override |
| public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { |
| return next(result); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner close()"); |
| innerScanner.close(); |
| // AggregateRegionObserver.LOG.info("Kylin Scanner close() done"); |
| } |
| |
| @Override |
| public HRegionInfo getRegionInfo() { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner getRegionInfo()"); |
| return innerScanner.getRegionInfo(); |
| } |
| |
| @Override |
| public long getMaxResultSize() { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner getMaxResultSize()"); |
| return Long.MAX_VALUE; |
| } |
| |
| @Override |
| public boolean isFilterDone() throws IOException { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner isFilterDone()"); |
| return false; |
| } |
| |
| @Override |
| public boolean reseek(byte[] row) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public long getMvccReadPoint() { |
| // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()"); |
| return Long.MAX_VALUE; |
| } |
| |
| @Override |
| public int getBatch() { |
| return innerScanner.getBatch(); |
| } |
| } |
| |
| } |