blob: 5649f5f74bf7b02067213714cfe0ad91bc7104b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.index.progress;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.TimeDurationFormatter;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdateCallback;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback;
import org.apache.jackrabbit.oak.spi.commit.Editor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexingProgressReporter implements NodeTraversalCallback {
private static final String REINDEX_MSG = "Reindexing";
private static final String INDEX_MSG = "Incremental indexing";
private final Logger log = LoggerFactory.getLogger(IndexUpdate.class);
private Stopwatch watch = Stopwatch.createStarted();
private final IndexUpdateCallback updateCallback;
private final NodeTraversalCallback traversalCallback;
private final Map<String, IndexUpdateState> indexUpdateStates = new HashMap<>();
private long traversalCount;
private String messagePrefix = INDEX_MSG;
private TraversalRateEstimator traversalRateEstimator = new SimpleRateEstimator();
private NodeCountEstimator nodeCountEstimator = NodeCountEstimator.NOOP;
private long estimatedCount;
public IndexingProgressReporter(IndexUpdateCallback updateCallback,
NodeTraversalCallback traversalCallback) {
this.updateCallback = updateCallback;
this.traversalCallback = traversalCallback;
}
public Editor wrapProgress(Editor editor) {
return ProgressTrackingEditor.wrap(editor, this);
}
/**
* Invoked to indicate that reindexing phase has started in current
* indexing cycle
* @param path
*/
public void reindexingTraversalStart(String path) {
estimatedCount = nodeCountEstimator.getEstimatedNodeCount(path, getReindexedIndexPaths());
if (estimatedCount >= 0) {
log.info("Estimated node count to be traversed for reindexing under {} is [{}]", path, estimatedCount);
}
messagePrefix = REINDEX_MSG;
}
/**
* Invoked to indicate that reindexing phase has ended in current
* indexing cycle
*/
public void reindexingTraversalEnd() {
messagePrefix = INDEX_MSG;
}
public void setMessagePrefix(String messagePrefix) {
this.messagePrefix = messagePrefix;
}
public void traversedNode(PathSource pathSource) throws CommitFailedException {
if (++traversalCount % 10000 == 0) {
double rate = traversalRateEstimator.getNodesTraversedPerSecond();
String formattedRate = String.format("%1.2f nodes/s, %1.2f nodes/hr", rate, rate * 3600);
String estimate = estimatePendingTraversal(rate);
log.info("{} Traversed #{} {} [{}] {}", messagePrefix, traversalCount, pathSource.getPath(), formattedRate, estimate);
}
traversalCallback.traversedNode(pathSource);
traversalRateEstimator.traversedNode();
}
/**
* Registers the index for progress tracking
*
* @param indexPath path of index
* @param reindexing true if the index is being reindexed
* @param estimatedCount an estimate of count of number of entries in the index. If less
* than zero then it indicates that estimation cannot be done
*/
public void registerIndex(String indexPath, boolean reindexing, long estimatedCount) {
indexUpdateStates.put(indexPath, new IndexUpdateState(indexPath, reindexing, estimatedCount));
}
/**
* Callback to indicate that index at give path has got an update
*/
public void indexUpdate(String indexPath) throws CommitFailedException {
indexUpdateStates.get(indexPath).indexUpdate();
}
public void logReport(){
if (isReindexingPerformed()){
log.info(getReport());
log.info("Reindexing completed");
} else if (log.isDebugEnabled() && somethingIndexed()){
log.debug(getReport());
}
}
public List<String> getReindexStats() {
return indexUpdateStates.values().stream()
.filter(st -> st.reindex)
.map(Object::toString)
.collect(Collectors.toList());
}
/**
* Returns true if any reindexing is performed in current indexing
* cycle
*/
public boolean isReindexingPerformed() {
return indexUpdateStates.values().stream().anyMatch(st -> st.reindex);
}
/**
* Set of indexPaths which have been updated or accessed
* in this indexing cycle.
*/
public Set<String> getUpdatedIndexPaths() {
return indexUpdateStates.keySet();
}
/**
* Set of indexPaths which have been reindexed
*/
public Set<String> getReindexedIndexPaths() {
return indexUpdateStates.values().stream()
.filter(st -> st.reindex)
.map(st -> st.indexPath)
.collect(Collectors.toSet());
}
public boolean somethingIndexed() {
return indexUpdateStates.values().stream().anyMatch(st -> st.updateCount > 0);
}
public void setTraversalRateEstimator(TraversalRateEstimator traversalRate) {
this.traversalRateEstimator = traversalRate;
}
public void setNodeCountEstimator(NodeCountEstimator nodeCountEstimator) {
this.nodeCountEstimator = nodeCountEstimator;
}
public void setEstimatedCount(long estimatedCount) {
this.estimatedCount = estimatedCount;
}
public void reset(){
watch = Stopwatch.createStarted();
traversalCount = 0;
messagePrefix = INDEX_MSG;
}
private String estimatePendingTraversal(double nodesPerSecond) {
if (estimatedCount >= 0) {
if (estimatedCount > traversalCount){
long pending = estimatedCount - traversalCount;
long timeRequired = (long)(pending / nodesPerSecond);
double percentComplete = ((double) traversalCount/estimatedCount) * 100;
return String.format("(Elapsed %s, Expected %s, Completed %1.2f%%)",
watch,
TimeDurationFormatter.forLogging().format(timeRequired, TimeUnit.SECONDS),
percentComplete
);
} else {
return String.format("(Elapsed %s)", watch);
}
}
return "";
}
private String getReport() {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
pw.println("Indexing report");
for (IndexUpdateState st : indexUpdateStates.values()) {
if (!log.isDebugEnabled() && !st.reindex) {
continue;
}
if (st.updateCount > 0 || st.reindex) {
pw.printf(" - %s%n", st);
}
}
return sw.toString();
}
private class IndexUpdateState {
final String indexPath;
final boolean reindex;
final long estimatedCount;
final Stopwatch watch = Stopwatch.createStarted();
long updateCount;
public IndexUpdateState(String indexPath, boolean reindex, long estimatedCount) {
this.indexPath = indexPath;
this.reindex = reindex;
this.estimatedCount = estimatedCount;
}
public void indexUpdate() throws CommitFailedException {
updateCount++;
if (updateCount % 10000 == 0) {
log.info("{} => Indexed {} nodes in {} ...", indexPath, updateCount, watch);
watch.reset().start();
}
updateCallback.indexUpdate();
}
@Override
public String toString() {
String reindexMarker = reindex ? "*" : "";
return indexPath + reindexMarker + "(" + updateCount + ")";
}
}
}