blob: 14973726328b580a9b62b240e5fa0c6d7ebd92ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.collect.Sets.newHashSet;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
import java.io.Closeable;
import java.util.Calendar;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.PropertyState;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.jmx.AnnotatedStandardMBean;
import org.apache.jackrabbit.oak.plugins.commit.AnnotatingConflictHandler;
import org.apache.jackrabbit.oak.plugins.commit.ConflictHook;
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate.MissingIndexProviderStrategy;
import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler.CorruptIndexInfo;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
import org.apache.jackrabbit.oak.plugins.index.progress.NodeCounterMBeanEstimator;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.CompositeEditorProvider;
import org.apache.jackrabbit.oak.spi.commit.CompositeHook;
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
import org.apache.jackrabbit.oak.spi.commit.ResetCommitAttributeHook;
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.commit.VisibleEditor;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.Counting;
import org.apache.jackrabbit.oak.stats.HistogramStats;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.stats.TimeSeriesStatsUtil;
import org.apache.jackrabbit.util.ISO8601;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableMap;
public class AsyncIndexUpdate implements Runnable, Closeable {
/**
* Name of service property which determines the name of Async task
*/
public static final String PROP_ASYNC_NAME = "oak.async";
private static final Logger log = LoggerFactory
.getLogger(AsyncIndexUpdate.class);
/**
* Name of the hidden node under which information about the checkpoints
* seen and indexed by each async indexer is kept.
*/
static final String ASYNC = ":async";
private static final long DEFAULT_LIFETIME = TimeUnit.DAYS.toMillis(1000);
private static final CommitFailedException INTERRUPTED = new CommitFailedException(
"Async", 1, "Indexing stopped forcefully");
/**
* Timeout in milliseconds after which an async job would be considered as
* timed out. Another node in cluster would wait for timeout before
* taking over a running job
*/
private static final long DEFAULT_ASYNC_TIMEOUT = TimeUnit.MINUTES.toMillis(
Integer.getInteger("oak.async.lease.timeout", 15));
private final String name;
private final NodeStore store;
private final IndexEditorProvider provider;
/**
* Property name which stores the timestamp upto which the repository is
* indexed
*/
private final String lastIndexedTo;
private final long lifetime = DEFAULT_LIFETIME; // TODO: make configurable
private final AsyncIndexStats indexStats;
/** Flag to switch to synchronous updates once the index caught up to the repo */
private final boolean switchOnSync;
/**
* Set of reindexed definitions updated between runs because a single diff
* can report less definitions than there really are. Used in coordination
* with the switchOnSync flag, so we know which def need to be updated after
* a run with no changes.
*/
private final Set<String> reindexedDefinitions = new HashSet<String>();
private final MissingIndexProviderStrategy missingStrategy = new DefaultMissingIndexProviderStrategy();
private final IndexTaskSpliter taskSplitter = new IndexTaskSpliter();
private final Semaphore runPermit = new Semaphore(1);
/**
* Flag which would be set to true if the close operation is not
* able to close within specific time. The flag would be an
* indication to indexing thread to return straightway say by
* throwing an exception
*/
private final AtomicBoolean forcedStopFlag = new AtomicBoolean();
private IndexMBeanRegistration mbeanRegistration;
private long leaseTimeOut;
/**
* Controls the length of the interval (in minutes) at which an indexing
* error is logged as 'warning'. for the rest of the indexing cycles errors
* will be logged at 'debug' level
*/
private static long ERROR_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer
.getInteger("oak.async.warn.interval", 30));
/**
* Timeout in seconds for which close call would wait before forcefully
* stopping the indexing thread
*/
private int softTimeOutSecs = Integer.getInteger("oak.async.softTimeOutSecs", 2 * 60);
private boolean closed;
/**
* The checkpoint cleanup interval in minutes. Defaults to 5 minutes.
* Setting it to a negative value disables automatic cleanup. See OAK-4826.
*/
private final int cleanupIntervalMinutes
= Integer.getInteger("oak.async.checkpointCleanupIntervalMinutes", 5);
/**
* The time in minutes since the epoch when the last checkpoint cleanup ran.
*/
private long lastCheckpointCleanUpTime;
private List<ValidatorProvider> validatorProviders = Collections.emptyList();
private TrackingCorruptIndexHandler corruptIndexHandler = new TrackingCorruptIndexHandler();
private final StatisticsProvider statisticsProvider;
public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider, boolean switchOnSync) {
this(name, store, provider, StatisticsProvider.NOOP, switchOnSync);
}
public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider, StatisticsProvider statsProvider, boolean switchOnSync) {
this.name = checkValidName(name);
this.lastIndexedTo = lastIndexedTo(name);
this.store = checkNotNull(store);
this.provider = checkNotNull(provider);
this.switchOnSync = switchOnSync;
this.leaseTimeOut = DEFAULT_ASYNC_TIMEOUT;
this.statisticsProvider = statsProvider;
this.indexStats = new AsyncIndexStats(name, statsProvider);
}
public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider) {
this(name, store, provider, false);
}
public static String checkValidName(String asyncName){
checkNotNull(asyncName, "async name should not be null");
if (IndexConstants.ASYNC_REINDEX_VALUE.equals(asyncName)){
return asyncName;
}
checkArgument(asyncName.endsWith("async"), "async name [%s] does not confirm to " +
"naming pattern of ending with 'async'", asyncName);
return asyncName;
}
public static boolean isAsyncLaneName(String asyncName){
return IndexConstants.ASYNC_REINDEX_VALUE.equals(asyncName) || asyncName.endsWith("async");
}
/**
* Index update callback that tries to raise the async status flag when
* the first index change is detected.
*
* @see <a href="https://issues.apache.org/jira/browse/OAK-1292">OAK-1292</a>
*/
protected static class AsyncUpdateCallback implements IndexUpdateCallback, NodeTraversalCallback {
/**
* Interval in terms of number of nodes traversed after which
* time would be checked for lease expiry
*/
public static final int LEASE_CHECK_INTERVAL = 10;
private final NodeStore store;
/** The base checkpoint */
private String checkpoint;
/**
* Property name which stores the temporary checkpoint that need to be released on the next run
*/
private final String tempCpName;
private final long leaseTimeOut;
private final String name;
private final String leaseName;
private final AsyncIndexStats indexStats;
private final AtomicBoolean forcedStop;
private List<ValidatorProvider> validatorProviders = Collections.emptyList();
/**
* Expiration time of the last lease we committed, null if lease is
* disabled
*/
private Long lease = null;
private boolean hasLease = false;
public AsyncUpdateCallback(NodeStore store, String name,
long leaseTimeOut, String checkpoint,
AsyncIndexStats indexStats, AtomicBoolean forcedStop) {
this.store = store;
this.name = name;
this.forcedStop = forcedStop;
this.leaseTimeOut = leaseTimeOut;
this.checkpoint = checkpoint;
this.tempCpName = getTempCpName(name);
this.indexStats = indexStats;
this.leaseName = leasify(name);
}
protected void initLease() throws CommitFailedException {
if (hasLease) {
return;
}
NodeState root = store.getRoot();
NodeState async = root.getChildNode(ASYNC);
if(isLeaseCheckEnabled(leaseTimeOut)) {
long now = getTime();
this.lease = now + 2 * leaseTimeOut;
long beforeLease = async.getLong(leaseName);
if (beforeLease > now) {
throw newConcurrentUpdateException();
}
NodeBuilder builder = root.builder();
builder.child(ASYNC).setProperty(leaseName, lease);
mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, beforeLease, name);
} else {
lease = null;
// remove stale lease info if needed
if (async.hasProperty(leaseName)) {
NodeBuilder builder = root.builder();
builder.child(ASYNC).removeProperty(leaseName);
mergeWithConcurrencyCheck(store, validatorProviders,
builder, checkpoint, null, name);
}
}
hasLease = true;
}
protected void prepare(String afterCheckpoint)
throws CommitFailedException {
if (!hasLease) {
initLease();
}
NodeState root = store.getRoot();
NodeBuilder builder = root.builder();
NodeBuilder async = builder.child(ASYNC);
updateTempCheckpoints(async, checkpoint, afterCheckpoint);
mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, lease, name);
// reset updates counter
indexStats.reset();
}
private void updateTempCheckpoints(NodeBuilder async,
String checkpoint, String afterCheckpoint) {
indexStats.setReferenceCheckpoint(checkpoint);
indexStats.setProcessedCheckpoint(afterCheckpoint);
// try to drop temp cps, add 'currentCp' to the temp cps list
Set<String> temps = newHashSet();
for (String cp : getStrings(async, tempCpName)) {
if (cp.equals(checkpoint)) {
temps.add(cp);
continue;
}
boolean released = store.release(cp);
log.debug("[{}] Releasing temporary checkpoint {}: {}", name, cp, released);
if (!released) {
temps.add(cp);
}
}
temps.add(afterCheckpoint);
async.setProperty(tempCpName, temps, Type.STRINGS);
indexStats.setTempCheckpoints(temps);
}
boolean isDirty() {
return indexStats.getUpdates() > 0;
}
void close() throws CommitFailedException {
if (isLeaseCheckEnabled(leaseTimeOut)) {
NodeBuilder builder = store.getRoot().builder();
NodeBuilder async = builder.child(ASYNC);
async.removeProperty(leaseName);
mergeWithConcurrencyCheck(store, validatorProviders, builder,
async.getString(name), lease, name);
}
}
@Override
public void indexUpdate() throws CommitFailedException {
checkIfStopped();
indexStats.incUpdates();
}
@Override
public void traversedNode(PathSource pathSource) throws CommitFailedException{
checkIfStopped();
if (indexStats.incTraversal() % LEASE_CHECK_INTERVAL == 0 && isLeaseCheckEnabled(leaseTimeOut)) {
long now = getTime();
if (now + leaseTimeOut > lease) {
long newLease = now + 2 * leaseTimeOut;
NodeBuilder builder = store.getRoot().builder();
builder.child(ASYNC).setProperty(leaseName, newLease);
mergeWithConcurrencyCheck(store, validatorProviders, builder, checkpoint, lease, name);
lease = newLease;
}
}
}
protected long getTime() {
return System.currentTimeMillis();
}
public void setCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
}
public void setValidatorProviders(List<ValidatorProvider> validatorProviders) {
this.validatorProviders = checkNotNull(validatorProviders);
}
private void checkIfStopped() throws CommitFailedException {
if (forcedStop.get()){
forcedStop.set(false);
throw INTERRUPTED;
}
}
}
@Override
public synchronized void run() {
boolean permitAcquired = false;
try{
if (runPermit.tryAcquire()){
permitAcquired = true;
runWhenPermitted();
} else {
log.warn("[{}] Could not acquire run permit. Stop flag set to [{}] Skipping the run", name, forcedStopFlag);
}
} finally {
if (permitAcquired){
runPermit.release();
}
}
}
@Override
public void close() {
if (closed) {
return;
}
int hardTimeOut = 5 * softTimeOutSecs;
if(!runPermit.tryAcquire()){
//First let current run complete without bothering it
log.debug("[{}] [WAITING] Indexing in progress. Would wait for {} secs for it to finish", name, softTimeOutSecs);
try {
if(!runPermit.tryAcquire(softTimeOutSecs, TimeUnit.SECONDS)){
//We have now waited enough. So signal the indexer that it should return right away
//as soon as it sees the forcedStopFlag
log.debug("[{}] [SOFT LIMIT HIT] Indexing found to be in progress for more than [{}]s. Would " +
"signal it to now force stop", name, softTimeOutSecs);
forcedStopFlag.set(true);
if(!runPermit.tryAcquire(hardTimeOut, TimeUnit.SECONDS)){
//Index thread did not listened to our advice. So give up now and warn about it
log.warn("[{}] Indexing still not found to be complete. Giving up after [{}]s", name, hardTimeOut);
}
} else {
log.info("[{}] [CLOSED OK] Async indexing run completed. Closing it now", name);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
log.info("[{}] Closed", name);
}
closed = true;
}
private void runWhenPermitted() {
if (indexStats.isPaused()) {
log.debug("[{}] Ignoring the run as indexing is paused", name);
return;
}
log.debug("[{}] Running background index task", name);
NodeState root = store.getRoot();
NodeState async = root.getChildNode(ASYNC);
if (isLeaseCheckEnabled(leaseTimeOut)) {
// check for concurrent updates
long leaseEndTime = async.getLong(leasify(name));
long currentTime = System.currentTimeMillis();
if (leaseEndTime > currentTime) {
long leaseExpMsg = (leaseEndTime - currentTime) / 1000;
String err = String.format("Another copy of the index update is already running; skipping this update. " +
"Time left for lease to expire %d s. Indexing can resume by %tT", leaseExpMsg, leaseEndTime);
indexStats.failed(new Exception(err, newConcurrentUpdateException()));
return;
}
}
// start collecting runtime statistics
preAsyncRunStatsStats(indexStats);
// find the last indexed state, and check if there are recent changes
NodeState before;
String beforeCheckpoint = async.getString(name);
AsyncUpdateCallback callback = newAsyncUpdateCallback(store,
name, leaseTimeOut, beforeCheckpoint, indexStats,
forcedStopFlag);
if (beforeCheckpoint != null) {
NodeState state = store.retrieve(beforeCheckpoint);
if (state == null) {
// to make sure we're not reading a stale root rev, we're
// attempting a write+read via the lease-grab mechanics
try {
callback.initLease();
} catch (CommitFailedException e) {
indexStats.failed(e);
return;
}
root = store.getRoot();
beforeCheckpoint = root.getChildNode(ASYNC).getString(name);
if (beforeCheckpoint != null) {
state = store.retrieve(beforeCheckpoint);
callback.setCheckpoint(beforeCheckpoint);
}
}
if (state == null) {
log.warn(
"[{}] Failed to retrieve previously indexed checkpoint {}; re-running the initial index update",
name, beforeCheckpoint);
beforeCheckpoint = null;
callback.setCheckpoint(beforeCheckpoint);
before = MISSING_NODE;
} else if (noVisibleChanges(state, root) && !switchOnSync) {
log.debug(
"[{}] No changes since last checkpoint; skipping the index update",
name);
postAsyncRunStatsStatus(indexStats);
return;
} else {
before = state;
}
} else {
log.info("[{}] Initial index update", name);
before = MISSING_NODE;
}
// there are some recent changes, so let's create a new checkpoint
String afterTime = now();
String oldThreadName = Thread.currentThread().getName();
boolean threadNameChanged = false;
String afterCheckpoint = store.checkpoint(lifetime, ImmutableMap.of(
"creator", AsyncIndexUpdate.class.getSimpleName(),
"created", afterTime,
"thread", oldThreadName,
"name", name));
NodeState after = store.retrieve(afterCheckpoint);
if (after == null) {
log.debug(
"[{}] Unable to retrieve newly created checkpoint {}, skipping the index update",
name, afterCheckpoint);
//Do not update the status as technically the run is not complete
return;
}
String checkpointToRelease = afterCheckpoint;
boolean updatePostRunStatus = false;
try {
String newThreadName = "async-index-update-" + name;
log.trace("Switching thread name to {}", newThreadName);
threadNameChanged = true;
Thread.currentThread().setName(newThreadName);
updatePostRunStatus = updateIndex(before, beforeCheckpoint, after,
afterCheckpoint, afterTime, callback);
// the update succeeded, i.e. it no longer fails
if (indexStats.didLastIndexingCycleFailed()) {
indexStats.fixed();
}
// the update succeeded, so we can release the earlier checkpoint
// otherwise the new checkpoint associated with the failed update
// will get released in the finally block
checkpointToRelease = beforeCheckpoint;
indexStats.setReferenceCheckpoint(afterCheckpoint);
indexStats.setProcessedCheckpoint("");
indexStats.releaseTempCheckpoint(afterCheckpoint);
} catch (Exception e) {
indexStats.failed(e);
} finally {
if (threadNameChanged) {
log.trace("Switching thread name back to {}", oldThreadName);
Thread.currentThread().setName(oldThreadName);
}
// null during initial indexing
// and skip release if this cp was used in a split operation
if (checkpointToRelease != null
&& !checkpointToRelease.equals(taskSplitter
.getLastReferencedCp())) {
if (!store.release(checkpointToRelease)) {
log.debug("[{}] Unable to release checkpoint {}", name,
checkpointToRelease);
}
}
maybeCleanUpCheckpoints();
if (updatePostRunStatus) {
postAsyncRunStatsStatus(indexStats);
}
}
}
private void markFailingIndexesAsCorrupt(NodeBuilder builder) {
for (Map.Entry<String, CorruptIndexInfo> index : corruptIndexHandler.getCorruptIndexData(name).entrySet()){
NodeBuilder indexBuilder = childBuilder(builder, index.getKey());
CorruptIndexInfo info = index.getValue();
if (!indexBuilder.hasProperty(IndexConstants.CORRUPT_PROPERTY_NAME)){
String corruptSince = ISO8601.format(info.getCorruptSinceAsCal());
indexBuilder.setProperty(
PropertyStates.createProperty(IndexConstants.CORRUPT_PROPERTY_NAME, corruptSince, Type.DATE));
log.info("Marking [{}] as corrupt. The index is failing {}", info.getPath(), info.getStats());
} else {
log.debug("Failing index at [{}] is already marked as corrupt. The index is failing {}",
info.getPath(), info.getStats());
}
}
}
private static NodeBuilder childBuilder(NodeBuilder nb, String path) {
for (String name : PathUtils.elements(checkNotNull(path))) {
nb = nb.child(name);
}
return nb;
}
private void maybeCleanUpCheckpoints() {
// clean up every five minutes
long currentMinutes = TimeUnit.MILLISECONDS.toMinutes(
System.currentTimeMillis());
if (!indexStats.isFailing()
&& cleanupIntervalMinutes > -1
&& lastCheckpointCleanUpTime + cleanupIntervalMinutes < currentMinutes) {
try {
cleanUpCheckpoints();
} catch (Throwable e) {
log.warn("Checkpoint clean up failed", e);
}
lastCheckpointCleanUpTime = currentMinutes;
}
}
void cleanUpCheckpoints() {
log.debug("[{}] Cleaning up orphaned checkpoints", name);
Set<String> keep = newHashSet();
String cp = indexStats.getReferenceCheckpoint();
if (cp == null) {
log.warn("[{}] No reference checkpoint set in index stats", name);
return;
}
keep.add(cp);
keep.addAll(indexStats.tempCps);
Map<String, String> info = store.checkpointInfo(cp);
String value = info.get("created");
if (value != null) {
// remove unreferenced AsyncIndexUpdate checkpoints:
// - without 'created' info (checkpoint created before OAK-4826)
// or
// - 'created' value older than the current reference and
// not within the lease time frame
long current = ISO8601.parse(value).getTimeInMillis();
for (String checkpoint : store.checkpoints()) {
info = store.checkpointInfo(checkpoint);
String creator = info.get("creator");
String created = info.get("created");
String name = info.get("name");
if (!keep.contains(checkpoint)
&& this.name.equals(name)
&& AsyncIndexUpdate.class.getSimpleName().equals(creator)
&& (created == null || ISO8601.parse(created).getTimeInMillis() + leaseTimeOut < current)) {
if (store.release(checkpoint)) {
log.info("[{}] Removed orphaned checkpoint '{}' {}",
name, checkpoint, info);
}
}
}
}
}
protected AsyncUpdateCallback newAsyncUpdateCallback(NodeStore store,
String name, long leaseTimeOut, String beforeCheckpoint,
AsyncIndexStats indexStats,
AtomicBoolean stopFlag) {
AsyncUpdateCallback callback = new AsyncUpdateCallback(store, name, leaseTimeOut,
beforeCheckpoint, indexStats, stopFlag);
callback.setValidatorProviders(validatorProviders);
return callback;
}
protected boolean updateIndex(NodeState before, String beforeCheckpoint,
NodeState after, String afterCheckpoint, String afterTime,
AsyncUpdateCallback callback) throws CommitFailedException {
Stopwatch watch = Stopwatch.createStarted();
boolean updatePostRunStatus = true;
boolean progressLogged = false;
// prepare the update callback for tracking index updates
// and maintaining the update lease
callback.prepare(afterCheckpoint);
// check for index tasks split requests, if a split happened, make
// sure to not delete the reference checkpoint, as the other index
// task will take care of it
taskSplitter.maybeSplit(beforeCheckpoint, callback.lease);
IndexUpdate indexUpdate = null;
try {
NodeBuilder builder = store.getRoot().builder();
markFailingIndexesAsCorrupt(builder);
CommitInfo info = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN,
ImmutableMap.of(IndexConstants.CHECKPOINT_CREATION_TIME, afterTime));
indexUpdate =
new IndexUpdate(provider, name, after, builder, callback, callback, info, corruptIndexHandler)
.withMissingProviderStrategy(missingStrategy);
configureRateEstimator(indexUpdate);
CommitFailedException exception =
EditorDiff.process(VisibleEditor.wrap(indexUpdate), before, after);
if (exception != null) {
throw exception;
}
builder.child(ASYNC).setProperty(name, afterCheckpoint);
builder.child(ASYNC).setProperty(PropertyStates.createProperty(lastIndexedTo, afterTime, Type.DATE));
if (callback.isDirty() || before == MISSING_NODE) {
if (switchOnSync) {
reindexedDefinitions.addAll(indexUpdate
.getReindexedDefinitions());
updatePostRunStatus = false;
} else {
updatePostRunStatus = true;
}
} else {
if (switchOnSync) {
log.debug(
"[{}] No changes detected after diff; will try to switch to synchronous updates on {}",
name, reindexedDefinitions);
// no changes after diff, switch to sync on the async defs
for (String path : reindexedDefinitions) {
NodeBuilder c = builder;
for (String p : elements(path)) {
c = c.getChildNode(p);
}
if (c.exists() && !c.getBoolean(REINDEX_PROPERTY_NAME)) {
c.removeProperty(ASYNC_PROPERTY_NAME);
}
}
reindexedDefinitions.clear();
if (store.release(afterCheckpoint)) {
builder.child(ASYNC).removeProperty(name);
builder.child(ASYNC).removeProperty(lastIndexedTo);
} else {
log.debug("[{}] Unable to release checkpoint {}", name, afterCheckpoint);
}
}
updatePostRunStatus = true;
}
mergeWithConcurrencyCheck(store, validatorProviders, builder, beforeCheckpoint,
callback.lease, name);
if (indexUpdate.isReindexingPerformed()) {
log.info("[{}] Reindexing completed for indexes: {} in {} ({} ms)",
name, indexUpdate.getReindexStats(),
watch, watch.elapsed(TimeUnit.MILLISECONDS));
progressLogged = true;
}
corruptIndexHandler.markWorkingIndexes(indexUpdate.getUpdatedIndexPaths());
} finally {
if (indexUpdate != null) {
if (updatePostRunStatus) {
indexUpdate.commitProgress(IndexCommitCallback.IndexProgress.COMMIT_SUCCEDED);
} else {
indexUpdate.commitProgress(IndexCommitCallback.IndexProgress.COMMIT_FAILED);
}
}
callback.close();
}
if (!progressLogged) {
String msg = "[{}] AsyncIndex update run completed in {}. Indexed {} nodes, {}";
//Log at info level if time taken is more than 5 min
if (watch.elapsed(TimeUnit.MINUTES) >= 5) {
log.info(msg, name, watch, indexStats.getUpdates(), indexUpdate.getIndexingStats());
} else {
log.debug(msg, name, watch, indexStats.getUpdates(), indexUpdate.getIndexingStats());
}
}
return updatePostRunStatus;
}
private void configureRateEstimator(IndexUpdate indexUpdate) {
//As metrics is an optional library guard the access with the check
if (statisticsProvider.getClass().getSimpleName().equals("MetricStatisticsProvider")){
MetricRegistry registry = ((MetricStatisticsProvider) statisticsProvider).getRegistry();
indexUpdate.setTraversalRateEstimator(new MetricRateEstimator(name, registry));
}
NodeCounterMBeanEstimator estimator = new NodeCounterMBeanEstimator(store);
indexUpdate.setNodeCountEstimator(estimator);
}
public static String leasify(String name) {
return name + "-lease";
}
static String lastIndexedTo(String name) {
return name + "-LastIndexedTo";
}
private static String getTempCpName(String name) {
return name + "-temp";
}
private static boolean isLeaseCheckEnabled(long leaseTimeOut) {
return leaseTimeOut > 0;
}
private static void mergeWithConcurrencyCheck(final NodeStore store, List<ValidatorProvider> validatorProviders,
NodeBuilder builder, final String checkpoint, final Long lease,
final String name) throws CommitFailedException {
CommitHook concurrentUpdateCheck = new CommitHook() {
@Override @NotNull
public NodeState processCommit(
NodeState before, NodeState after, CommitInfo info)
throws CommitFailedException {
// check for concurrent updates by this async task
NodeState async = before.getChildNode(ASYNC);
if ((checkpoint == null || Objects.equal(checkpoint, async.getString(name)))
&&
(lease == null || lease == async.getLong(leasify(name)))) {
return after;
} else {
throw newConcurrentUpdateException();
}
}
};
List<EditorProvider> editorProviders = Lists.newArrayList();
editorProviders.add(new ConflictValidatorProvider());
editorProviders.addAll(validatorProviders);
CompositeHook hooks = new CompositeHook(
ResetCommitAttributeHook.INSTANCE,
ConflictHook.of(new AnnotatingConflictHandler()),
new EditorHook(CompositeEditorProvider.compose(editorProviders)),
concurrentUpdateCheck);
try {
store.merge(builder, hooks, createCommitInfo());
} catch (CommitFailedException ex) {
// OAK-2961
if (ex.isOfType(CommitFailedException.STATE) && ex.getCode() == 1) {
throw newConcurrentUpdateException();
} else {
throw ex;
}
}
}
private static CommitInfo createCommitInfo() {
Map<String, Object> info = ImmutableMap.<String, Object>of(CommitContext.NAME, new SimpleCommitContext());
return new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN, info);
}
/**
* Milliseconds for the timeout
*/
protected AsyncIndexUpdate setLeaseTimeOut(long leaseTimeOut) {
this.leaseTimeOut = leaseTimeOut;
return this;
}
protected long getLeaseTimeOut() {
return leaseTimeOut;
}
protected AsyncIndexUpdate setCloseTimeOut(int timeOutInSec) {
this.softTimeOutSecs = timeOutInSec;
return this;
}
public void setValidatorProviders(List<ValidatorProvider> validatorProviders) {
this.validatorProviders = checkNotNull(validatorProviders);
}
public void setCorruptIndexHandler(TrackingCorruptIndexHandler corruptIndexHandler) {
this.corruptIndexHandler = checkNotNull(corruptIndexHandler);
}
TrackingCorruptIndexHandler getCorruptIndexHandler() {
return corruptIndexHandler;
}
public boolean isClosed(){
return closed || forcedStopFlag.get();
}
boolean isClosing(){
return runPermit.hasQueuedThreads();
}
private static void preAsyncRunStatsStats(AsyncIndexStats stats) {
stats.start(now());
}
private static void postAsyncRunStatsStatus(AsyncIndexStats stats) {
stats.done(now());
}
private static String now() {
return ISO8601.format(Calendar.getInstance());
}
public AsyncIndexStats getIndexStats() {
return indexStats;
}
public boolean isFinished() {
return indexStats.getStatus() == STATUS_DONE;
}
final class AsyncIndexStats extends AnnotatedStandardMBean implements IndexStatsMBean {
protected AsyncIndexStats(String name, StatisticsProvider statsProvider) {
super(IndexStatsMBean.class);
this.execStats = new ExecutionStats(name, statsProvider);
}
private String start = "";
private String done = "";
private String status = STATUS_INIT;
private String referenceCp = "";
private String processedCp = "";
private Set<String> tempCps = new HashSet<String>();
private volatile boolean isPaused;
private volatile long updates;
private volatile long nodesRead;
private final Stopwatch watch = Stopwatch.createUnstarted();
private final ExecutionStats execStats;
/** Flag to avoid repeatedly logging failure warnings */
private volatile boolean failing = false;
private long latestErrorWarn = 0;
private String failingSince = "";
private String latestError = null;
private String latestErrorTime = "";
private long consecutiveFailures = 0;
public void start(String now) {
status = STATUS_RUNNING;
start = now;
done = "";
if (watch.isRunning()) {
watch.reset();
}
watch.start();
}
public void done(String now) {
if (corruptIndexHandler.isFailing(name)){
status = STATUS_FAILING;
} else {
status = STATUS_DONE;
}
done = now;
if (watch.isRunning()) {
watch.stop();
}
execStats.doneOneCycle(watch.elapsed(TimeUnit.MILLISECONDS), updates);
watch.reset();
}
public void failed(Exception e) {
if (e == INTERRUPTED){
status = STATUS_INTERRUPTED;
log.info("[{}] The index update interrupted", name);
log.debug("[{}] The index update interrupted", name, e);
return;
}
latestError = getStackTraceAsString(e);
latestErrorTime = now();
consecutiveFailures++;
if (!failing) {
// first occurrence of a failure
failing = true;
// reusing value so value display is consistent
failingSince = latestErrorTime;
latestErrorWarn = System.currentTimeMillis();
log.warn("[{}] The index update failed", name, e);
} else {
// subsequent occurrences
boolean warn = System.currentTimeMillis() - latestErrorWarn > ERROR_WARN_INTERVAL;
if (warn) {
latestErrorWarn = System.currentTimeMillis();
log.warn("[{}] The index update is still failing", name, e);
} else {
log.debug("[{}] The index update is still failing", name, e);
}
}
}
public void fixed() {
if (corruptIndexHandler.isFailing(name)){
log.info("[{}] Index update no longer fails but some corrupt indexes have been skipped {}", name,
corruptIndexHandler.getCorruptIndexData(name).keySet());
} else {
log.info("[{}] Index update no longer fails", name);
}
failing = false;
failingSince = "";
consecutiveFailures = 0;
latestErrorWarn = 0;
latestError = null;
latestErrorTime = "";
}
public boolean isFailing() {
return failing || corruptIndexHandler.isFailing(name);
}
public boolean didLastIndexingCycleFailed(){
return failing;
}
@Override
public String getName() {
return name;
}
@Override
public String getStart() {
return start;
}
@Override
public String getDone() {
return done;
}
@Override
public String getStatus() {
return status;
}
@Override
public String getLastIndexedTime() {
PropertyState ps = store.getRoot().getChildNode(ASYNC).getProperty(lastIndexedTo);
return ps != null ? ps.getValue(Type.STRING) : null;
}
@Override
public void pause() {
log.debug("[{}] Pausing the async indexer", name);
this.isPaused = true;
}
@Override
public String abortAndPause() {
//First pause to avoid any race
pause();
//Set the forcedStop flag anyway. In resume this would be cleared
forcedStopFlag.set(true);
String msg = "";
//Abort if any indexing run is in progress
if (runPermit.availablePermits() == 0){
msg = "Abort request placed for current run. ";
}
return msg + "Indexing is paused now. Invoke 'resume' to resume indexing";
}
@Override
public void resume() {
log.debug("[{}] Resuming the async indexer", name);
this.isPaused = false;
//Clear the forcedStop flag as fail safe
forcedStopFlag.set(false);
}
@Override
public boolean isPaused() {
return this.isPaused;
}
void reset() {
this.updates = 0;
this.nodesRead = 0;
}
long incUpdates() {
return ++updates;
}
long incTraversal() {
return ++nodesRead;
}
@Override
public long getUpdates() {
return updates;
}
@Override
public long getNodesReadCount(){
return nodesRead;
}
void setReferenceCheckpoint(String checkpoint) {
this.referenceCp = checkpoint;
}
@Override
public String getReferenceCheckpoint() {
return referenceCp;
}
void setProcessedCheckpoint(String checkpoint) {
this.processedCp = checkpoint;
}
@Override
public String getProcessedCheckpoint() {
return processedCp;
}
void setTempCheckpoints(Set<String> tempCheckpoints) {
this.tempCps = tempCheckpoints;
}
void releaseTempCheckpoint(String tempCheckpoint) {
this.tempCps.remove(tempCheckpoint);
}
@Override
public String getTemporaryCheckpoints() {
return tempCps.toString();
}
@Override
public long getTotalExecutionCount() {
return execStats.getExecutionCounter().getCount();
}
@Override
public CompositeData getExecutionCount() {
return execStats.getExecutionCount();
}
@Override
public CompositeData getExecutionTime() {
//Do nothing. Kept for backward compatibility
return null;
}
@Override
public CompositeData getIndexedNodesCount() {
return execStats.getIndexedNodesCount();
}
@Override
public CompositeData getConsolidatedExecutionStats() {
return execStats.getConsolidatedStats();
}
@Override
public void resetConsolidatedExecutionStats() {
//Do nothing. Kept for backward compatibility
}
@Override
public String toString() {
return "AsyncIndexStats [start=" + start + ", done=" + done
+ ", status=" + status + ", paused=" + isPaused
+ ", failing=" + failing + ", failingSince=" + failingSince
+ ", consecutiveFailures=" + consecutiveFailures
+ ", updates=" + updates + ", referenceCheckpoint="
+ referenceCp + ", processedCheckpoint=" + processedCp
+ " ,tempCheckpoints=" + tempCps + ", latestErrorTime="
+ latestErrorTime + ", latestError=" + latestError + " ]";
}
ExecutionStats getExecutionStats() {
return execStats;
}
class ExecutionStats {
public static final String INDEXER_COUNT = "INDEXER_COUNT";
public static final String INDEXER_NODE_COUNT = "INDEXER_NODE_COUNT";
private final MeterStats indexerExecutionCountMeter;
private final MeterStats indexedNodeCountMeter;
private final TimerStats indexerTimer;
private final HistogramStats indexedNodePerCycleHisto;
private StatisticsProvider statisticsProvider;
private final String[] names = {"Executions", "Nodes"};
private final String name;
private CompositeType consolidatedType;
public ExecutionStats(String name, StatisticsProvider statsProvider) {
this.name = name;
this.statisticsProvider = statsProvider;
indexerExecutionCountMeter = statsProvider.getMeter(stats(INDEXER_COUNT), StatsOptions.DEFAULT);
indexedNodeCountMeter = statsProvider.getMeter(stats(INDEXER_NODE_COUNT), StatsOptions.DEFAULT);
indexerTimer = statsProvider.getTimer(stats("INDEXER_TIME"), StatsOptions.METRICS_ONLY);
indexedNodePerCycleHisto = statsProvider.getHistogram(stats("INDEXER_NODE_COUNT_HISTO"), StatsOptions
.METRICS_ONLY);
try {
consolidatedType = new CompositeType("ConsolidatedStats",
"Consolidated stats", names,
names,
new OpenType[] {SimpleType.LONG, SimpleType.LONG});
} catch (OpenDataException e) {
log.warn("[{}] Error in creating CompositeType for consolidated stats", AsyncIndexUpdate.this.name, e);
}
}
public void doneOneCycle(long timeInMillis, long updates){
indexerExecutionCountMeter.mark();
indexedNodeCountMeter.mark(updates);
indexerTimer.update(timeInMillis, TimeUnit.MILLISECONDS);
indexedNodePerCycleHisto.update(updates);
}
public Counting getExecutionCounter() {
return indexerExecutionCountMeter;
}
public Counting getIndexedNodeCount() {
return indexedNodeCountMeter;
}
private CompositeData getExecutionCount() {
return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(stats(INDEXER_COUNT)),
"Indexer Execution Count");
}
private CompositeData getIndexedNodesCount() {
return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(stats(INDEXER_NODE_COUNT)),
"Indexer Node Count");
}
private CompositeData getConsolidatedStats() {
try {
Long[] values = new Long[]{indexerExecutionCountMeter.getCount(),
indexedNodeCountMeter.getCount()};
return new CompositeDataSupport(consolidatedType, names, values);
} catch (Exception e) {
log.error("[{}] Error retrieving consolidated stats", name, e);
return null;
}
}
private String stats(String suffix){
return name + "." + suffix;
}
private TimeSeries getTimeSeries(String name) {
return statisticsProvider.getStats().getTimeSeries(name, true);
}
}
@Override
public void splitIndexingTask(String paths, String newIndexTaskName) {
splitIndexingTask(newHashSet(Splitter.on(",").trimResults()
.omitEmptyStrings().split(paths)), newIndexTaskName);
}
private void splitIndexingTask(Set<String> paths,
String newIndexTaskName) {
taskSplitter.registerSplit(paths, newIndexTaskName);
}
@Override
public void registerAsyncIndexer(String name, long delayInSeconds) {
taskSplitter.registerAsyncIndexer(name, delayInSeconds);
}
@Override
public String getFailingSince() {
return failingSince;
}
@Override
public long getConsecutiveFailedExecutions() {
return consecutiveFailures;
}
@Override
public String getLatestError() {
return latestError;
}
@Override
public String getLatestErrorTime() {
return latestErrorTime;
}
@Override
public TabularData getFailingIndexStats() {
return corruptIndexHandler.getFailingIndexStats(name);
}
}
/**
* Checks whether there are no visible changes between the given states.
*/
private static boolean noVisibleChanges(NodeState before, NodeState after) {
return after.compareAgainstBaseState(before, new NodeStateDiff() {
@Override
public boolean propertyAdded(PropertyState after) {
return isHidden(after.getName());
}
@Override
public boolean propertyChanged(
PropertyState before, PropertyState after) {
return isHidden(after.getName());
}
@Override
public boolean propertyDeleted(PropertyState before) {
return isHidden(before.getName());
}
@Override
public boolean childNodeAdded(String name, NodeState after) {
return isHidden(name);
}
@Override
public boolean childNodeChanged(
String name, NodeState before, NodeState after) {
return isHidden(name)
|| after.compareAgainstBaseState(before, this);
}
@Override
public boolean childNodeDeleted(String name, NodeState before) {
return isHidden(name);
}
});
}
private static boolean isHidden(String name) {
return name.charAt(0) == ':';
}
static class DefaultMissingIndexProviderStrategy extends
MissingIndexProviderStrategy {
@Override
public void onMissingIndex(String type, NodeBuilder definition, String path)
throws CommitFailedException {
if (isDisabled(type)) {
return;
}
throw new CommitFailedException("Async", 2,
"Missing index provider detected for type [" + type
+ "] on index [" + path + "]");
}
}
public boolean isFailing() {
return indexStats.isFailing();
}
class IndexTaskSpliter {
private Set<String> paths = null;
private String newIndexTaskName = null;
private String lastReferencedCp;
private Set<String> registeredTasks = newHashSet();
void registerSplit(Set<String> paths, String newIndexTaskName) {
log.info(
"[{}] Registered split of following index definitions {} to new async task {}.",
name, paths, newIndexTaskName);
this.paths = newHashSet(paths);
this.newIndexTaskName = newIndexTaskName;
}
void maybeSplit(@Nullable String refCheckpoint, Long lease)
throws CommitFailedException {
if (paths == null) {
return;
}
split(refCheckpoint, lease);
}
private void split(@Nullable String refCheckpoint, Long lease) throws CommitFailedException {
NodeBuilder builder = store.getRoot().builder();
if (refCheckpoint != null) {
String tempCpName = getTempCpName(name);
NodeBuilder async = builder.child(ASYNC);
// add new reference
async.setProperty(newIndexTaskName, refCheckpoint);
// update old 'temp' list: remove refcp so it doesn't get released on next run
Set<String> temps = newHashSet();
for (String cp : getStrings(async, tempCpName)) {
if (cp.equals(refCheckpoint)) {
continue;
}
temps.add(cp);
}
async.setProperty(tempCpName, temps, Type.STRINGS);
indexStats.setTempCheckpoints(temps);
}
// update index defs name => newIndexTaskName
Set<String> updated = newHashSet();
for (String path : paths) {
NodeBuilder c = builder;
for (String p : elements(path)) {
c = c.getChildNode(p);
}
if (c.exists() && name.equals(c.getString("async"))) {
//TODO Fix this to account for nrt and sync
c.setProperty("async", newIndexTaskName);
updated.add(path);
}
}
if (!updated.isEmpty()) {
mergeWithConcurrencyCheck(store, validatorProviders, builder, refCheckpoint, lease, name);
log.info(
"[{}] Successfully split index definitions {} to async task named {} with referenced checkpoint {}.",
name, updated, newIndexTaskName, refCheckpoint);
lastReferencedCp = refCheckpoint;
}
paths = null;
newIndexTaskName = null;
}
public String getLastReferencedCp() {
return lastReferencedCp;
}
void registerAsyncIndexer(String newTask, long delayInSeconds) {
if (registeredTasks.contains(newTask)) {
// prevent accidental double call
log.warn("[{}] Task {} is already registered.", name, newTask);
return;
}
if (mbeanRegistration != null) {
log.info(
"[{}] Registering a new indexing task {} running each {} seconds.",
name, newTask, delayInSeconds);
AsyncIndexUpdate task = new AsyncIndexUpdate(newTask, store,
provider);
mbeanRegistration.registerAsyncIndexer(task, delayInSeconds);
registeredTasks.add(newTask);
}
}
}
private static Iterable<String> getStrings(NodeBuilder b, String p) {
PropertyState ps = b.getProperty(p);
if (ps != null) {
return ps.getValue(Type.STRINGS);
}
return newHashSet();
}
IndexTaskSpliter getTaskSplitter() {
return taskSplitter;
}
public void setIndexMBeanRegistration(IndexMBeanRegistration mbeanRegistration) {
this.mbeanRegistration = mbeanRegistration;
}
public String getName() {
return name;
}
private static CommitFailedException newConcurrentUpdateException() {
return new CommitFailedException("Async", 1, "Concurrent update detected");
}
}