blob: 8dba1b157d92fad6c07b54366bc74295801d04b8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorBehavior;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
/**
* @author yangli9
*/
public class AggregateRegionObserver extends BaseRegionObserver {
// HBase uses common logging (vs. Kylin uses slf4j)
static final Log LOG = LogFactory.getLog(AggregateRegionObserver.class);
static final String COPROCESSOR_ENABLE = "_Coprocessor_Enable";
static final String TYPE = "_Type";
static final String PROJECTOR = "_Projector";
static final String AGGREGATORS = "_Aggregators";
static final String FILTER = "_Filter";
static final String BEHAVIOR = "_Behavior";
@Override
public final RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
boolean copAbortOnError = ctxt.getEnvironment().getConfiguration().getBoolean(RegionCoprocessorHost.ABORT_ON_ERROR_KEY, RegionCoprocessorHost.DEFAULT_ABORT_ON_ERROR);
// never throw out exception that could abort region server
if (copAbortOnError) {
try {
return doPostScannerObserver(ctxt, scan, innerScanner);
} catch (Throwable e) {
LOG.error("Kylin Coprocessor Error", e);
return innerScanner;
}
} else {
return doPostScannerObserver(ctxt, scan, innerScanner);
}
}
private RegionScanner doPostScannerObserver(final ObserverContext<RegionCoprocessorEnvironment> ctxt, final Scan scan, final RegionScanner innerScanner) throws IOException {
byte[] coprocessorEnableBytes = scan.getAttribute(COPROCESSOR_ENABLE);
if (coprocessorEnableBytes == null || coprocessorEnableBytes.length == 0 || coprocessorEnableBytes[0] == 0) {
return innerScanner;
}
byte[] typeBytes = scan.getAttribute(TYPE);
CoprocessorRowType type = CoprocessorRowType.deserialize(typeBytes);
byte[] projectorBytes = scan.getAttribute(PROJECTOR);
CoprocessorProjector projector = CoprocessorProjector.deserialize(projectorBytes);
byte[] aggregatorBytes = scan.getAttribute(AGGREGATORS);
ObserverAggregators aggregators = ObserverAggregators.deserialize(aggregatorBytes);
byte[] filterBytes = scan.getAttribute(FILTER);
CoprocessorFilter filter = CoprocessorFilter.deserialize(filterBytes);
CoprocessorBehavior coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
try {
byte[] behavior = scan.getAttribute(BEHAVIOR);
if (behavior != null && behavior.length != 0) {
coprocessorBehavior = CoprocessorBehavior.valueOf(new String(behavior));
}
} catch (Exception e) {
LOG.error("failed to parse behavior,using default behavior SCAN_FILTER_AGGR_CHECKMEM", e);
coprocessorBehavior = CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM;
}
// start/end region operation & sync on scanner is suggested by the
// javadoc of RegionScanner.nextRaw()
// FIXME: will the lock still work when a iterator is returned? is it safe? Is readonly attribute helping here? by mhb
Region region = ctxt.getEnvironment().getRegion();
region.startRegionOperation();
try {
synchronized (innerScanner) {
return new AggregationScanner(type, filter, projector, aggregators, innerScanner, coprocessorBehavior);
}
} finally {
region.closeRegionOperation();
}
}
}