blob: 83069405429d74561215126769ae4b4194e599b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.schema.stats;
import static org.apache.phoenix.query.QueryServices.COMMIT_STATS_ASYNC;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_COMMIT_STATS_ASYNC;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
* The scanner that does the scanning to collect the stats during major compaction.{@link DefaultStatisticsCollector}
*/
public class StatisticsScanner implements InternalScanner {
private static final Log LOG = LogFactory.getLog(StatisticsScanner.class);
private InternalScanner delegate;
private StatisticsWriter statsWriter;
private Region region;
private StatisticsCollector tracker;
private ImmutableBytesPtr family;
private final Configuration config;
private final RegionServerServices regionServerServices;
public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env,
InternalScanner delegate, ImmutableBytesPtr family) {
this.tracker = tracker;
this.statsWriter = stats;
this.delegate = delegate;
this.regionServerServices = env.getRegionServerServices();
this.region = env.getRegion();
this.family = family;
this.config = env.getConfiguration();
StatisticsCollectionRunTracker.getInstance(config).addCompactingRegion(region.getRegionInfo());
}
@Override
public boolean next(List<Cell> result) throws IOException {
boolean ret = delegate.next(result);
updateStats(result);
return ret;
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
boolean ret = delegate.next(result, scannerContext);
updateStats(result);
return ret;
}
/**
* Update the current statistics based on the lastest batch of key-values from the underlying scanner
*
* @param results
* next batch of {@link KeyValue}s
* @throws IOException
*/
private void updateStats(final List<Cell> results) throws IOException {
if (!results.isEmpty()) {
tracker.collectStatistics(results);
}
}
@Override
public void close() throws IOException {
boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC);
StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
StatisticsScannerCallable callable = createCallable();
if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
LOG.debug("Not updating table statistics because the server is stopping/stopped");
return;
}
if (!async) {
callable.call();
} else {
collectionTracker.runTask(callable);
}
}
// VisibleForTesting
StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) {
return StatisticsCollectionRunTracker.getInstance(c);
}
Configuration getConfig() {
return config;
}
StatisticsWriter getStatisticsWriter() {
return statsWriter;
}
RegionServerServices getRegionServerServices() {
return regionServerServices;
}
Region getRegion() {
return region;
}
StatisticsScannerCallable createCallable() {
return new StatisticsScannerCallable();
}
StatisticsCollector getTracker() {
return tracker;
}
InternalScanner getDelegate() {
return delegate;
}
class StatisticsScannerCallable implements Callable<Void> {
@Override
public Void call() throws IOException {
IOException toThrow = null;
StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config);
final HRegionInfo regionInfo = getRegion().getRegionInfo();
try {
// update the statistics table
// Just verify if this if fine
ArrayList<Mutation> mutations = new ArrayList<Mutation>();
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
getStatisticsWriter().deleteStats(region, tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
getStatisticsWriter().addStats(tracker, family, mutations);
if (LOG.isDebugEnabled()) {
LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString()
+ " as part of major compaction");
}
getStatisticsWriter().commitStats(mutations, tracker);
} catch (IOException e) {
if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) {
LOG.debug("Ignoring error updating statistics because region is closing/closed");
} else {
LOG.error("Failed to update statistics table!", e);
toThrow = e;
}
} finally {
try {
collectionTracker.removeCompactingRegion(regionInfo);
getStatisticsWriter().close();// close the writer
getTracker().close();// close the tracker
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the stats table", e);
} finally {
// close the delegate scanner
try {
getDelegate().close();
} catch (IOException e) {
if (toThrow == null) toThrow = e;
LOG.error("Error while closing the scanner", e);
} finally {
if (toThrow != null) { throw toThrow; }
}
}
}
return null;
}
}
}