blob: c69fd8bd8b2e7bb0a12943968ed34ec06c3850eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.storage.StorageContext;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter;
import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter;
import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
/**
* @author yangli9
*/
public class ObserverEnabler {
private static final Logger logger = LoggerFactory.getLogger(ObserverEnabler.class);
static final String FORCE_COPROCESSOR = "forceObserver";
static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, Table table, Scan scan) throws IOException {
if (context.isCoprocessorEnabled() == false) {
return table.getScanner(scan);
}
CoprocessorRowType type = CoprocessorRowType.fromCuboid(segment, cuboid);
CoprocessorFilter filter = CoprocessorFilter.fromFilter(segment.getDimensionEncodingMap(), tupleFiler, FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_GLOBAL_DICT);
CoprocessorProjector projector = CoprocessorProjector.makeForObserver(segment, cuboid, groupBy);
ObserverAggregators aggrs = ObserverAggregators.fromValueDecoders(rowValueDecoders);
boolean localCoprocessor = KylinConfig.getInstanceFromEnv().getQueryRunLocalCoprocessor() || BackdoorToggles.getRunLocalCoprocessor();
if (localCoprocessor) {
RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM);
return new ResultScannerAdapter(aggrScanner);
} else {
// debug/profiling purpose
String toggle = BackdoorToggles.getCoprocessorBehavior();
if (toggle == null) {
toggle = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
} else {
logger.info("The execution of this query will use " + toggle + " as observer's behavior");
}
scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
scan.setAttribute(AggregateRegionObserver.BEHAVIOR, toggle.getBytes());
scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
scan.setAttribute(AggregateRegionObserver.FILTER, CoprocessorFilter.serialize(filter));
return table.getScanner(scan);
}
}
public static void enableCoprocessorIfBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
if (isCoprocessorBeneficial(cube, groupBy, rowValueDecoders, context)) {
context.enableCoprocessor();
}
}
private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
String forceFlag = System.getProperty(FORCE_COPROCESSOR);
if (forceFlag != null) {
boolean r = Boolean.parseBoolean(forceFlag);
logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to sys prop " + FORCE_COPROCESSOR);
return r;
}
Boolean cubeOverride = CUBE_OVERRIDES.get(cube.getName());
if (cubeOverride != null) {
boolean r = cubeOverride.booleanValue();
logger.info("Coprocessor is " + (r ? "enabled" : "disabled") + " according to cube overrides");
return r;
}
if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
logger.info("Coprocessor is disabled because there is memory hungry count distinct");
return false;
}
if (context.isExactAggregation()) {
logger.info("Coprocessor is disabled because exactAggregation is true");
return false;
}
Cuboid cuboid = context.getCuboid();
Set<TblColRef> toAggr = Sets.newHashSet(cuboid.getAggregationColumns());
toAggr.removeAll(groupBy);
if (toAggr.isEmpty()) {
logger.info("Coprocessor is disabled because no additional columns to aggregate");
return false;
}
logger.info("Coprocessor is enabled to aggregate " + toAggr + ", returning " + groupBy);
return true;
}
@SuppressWarnings("unused")
private static int getBitsToScan(byte[] startKey, byte[] stopKey) {
// find the first bit difference from the beginning
int totalBits = startKey.length * 8;
int bitsToScan = totalBits;
for (int i = 0; i < totalBits; i++) {
int byteIdx = i / 8;
int bitIdx = 7 - i % 8;
byte bitMask = (byte) (1 << bitIdx);
if ((startKey[byteIdx] & bitMask) == (stopKey[byteIdx] & bitMask))
bitsToScan--;
else
break;
}
return bitsToScan;
}
public static void forceCoprocessorOn() {
System.setProperty(FORCE_COPROCESSOR, "true");
}
public static void forceCoprocessorOff() {
System.setProperty(FORCE_COPROCESSOR, "false");
}
public static String getForceCoprocessor() {
return System.getProperty(FORCE_COPROCESSOR);
}
public static void forceCoprocessorUnset() {
System.clearProperty(FORCE_COPROCESSOR);
}
public static void updateCubeOverride(String cubeName, String force) {
if ("null".equalsIgnoreCase(force) || "default".equalsIgnoreCase(force)) {
CUBE_OVERRIDES.remove(cubeName);
} else if ("true".equalsIgnoreCase(force)) {
CUBE_OVERRIDES.put(cubeName, Boolean.TRUE);
} else if ("false".equalsIgnoreCase(force)) {
CUBE_OVERRIDES.put(cubeName, Boolean.FALSE);
}
}
public static Map<String, Boolean> getCubeOverrides() {
return CUBE_OVERRIDES;
}
}