blob: 1809a4436532601b10d2d0e3b3b88e7b6c55be27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
/**
* @author yangli9
*/
@SuppressWarnings("rawtypes")
public class ObserverAggregationCache extends AggregationCache {
private final ObserverAggregators aggregators;
public ObserverAggregationCache(ObserverAggregators aggregators) {
this.aggregators = aggregators;
}
public RegionScanner getScanner(RegionScanner innerScanner) {
return new AggregationRegionScanner(innerScanner);
}
@Override
public MeasureAggregator[] createBuffer() {
return aggregators.createBuffer();
}
private class AggregationRegionScanner implements RegionScanner {
private final RegionScanner innerScanner;
private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator;
public AggregationRegionScanner(RegionScanner innerScanner) {
this.innerScanner = innerScanner;
this.iterator = aggBufMap.entrySet().iterator();
}
@Override
public boolean next(List<Cell> results) throws IOException {
try {
// AggregateRegionObserver.LOG.info("Kylin Scanner next()");
boolean hasMore = false;
if (iterator.hasNext()) {
Entry<AggrKey, MeasureAggregator[]> entry = iterator.next();
makeCells(entry, results);
hasMore = iterator.hasNext();
}
// AggregateRegionObserver.LOG.info("Kylin Scanner next() done");
return hasMore;
} catch (Exception e) {
throw new IOException("Error when calling next", e);
}
}
private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
byte[][] families = aggregators.getHColFamilies();
byte[][] qualifiers = aggregators.getHColQualifiers();
int nHCols = aggregators.getHColsNum();
AggrKey rowKey = entry.getKey();
MeasureAggregator[] aggBuf = entry.getValue();
ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf);
if (nHCols == 0) {
Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), //
null, 0, 0, //
null, 0, 0, //
HConstants.LATEST_TIMESTAMP, Type.Put, //
null, 0, 0);
results.add(keyValue);
} else {
for (int i = 0; i < nHCols; i++) {
Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), //
families[i], 0, families[i].length, //
qualifiers[i], 0, qualifiers[i].length, //
HConstants.LATEST_TIMESTAMP, Type.Put, //
rowValues[i].array(), 0, rowValues[i].position());
results.add(keyValue);
}
}
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
public boolean nextRaw(List<Cell> result) throws IOException {
return next(result);
}
@Override
public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
public void close() throws IOException {
// AggregateRegionObserver.LOG.info("Kylin Scanner close()");
innerScanner.close();
// AggregateRegionObserver.LOG.info("Kylin Scanner close() done");
}
@Override
public HRegionInfo getRegionInfo() {
// AggregateRegionObserver.LOG.info("Kylin Scanner getRegionInfo()");
return innerScanner.getRegionInfo();
}
@Override
public long getMaxResultSize() {
// AggregateRegionObserver.LOG.info("Kylin Scanner getMaxResultSize()");
return Long.MAX_VALUE;
}
@Override
public boolean isFilterDone() throws IOException {
// AggregateRegionObserver.LOG.info("Kylin Scanner isFilterDone()");
return false;
}
@Override
public boolean reseek(byte[] row) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public long getMvccReadPoint() {
// AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()");
return Long.MAX_VALUE;
}
@Override
public int getBatch() {
return innerScanner.getBatch();
}
}
}