blob: 50e1eef49b4c6c6c1da314e451ab8978f3b939e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.index;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindex;
import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindexMBean;
import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.metatype.annotations.AttributeDefinition;
import org.osgi.service.metatype.annotations.Designate;
import org.osgi.service.metatype.annotations.ObjectClassDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
@Component(
configurationPolicy = ConfigurationPolicy.REQUIRE,
service = {})
@Designate(ocd = AsyncIndexerService.Configuration.class)
public class AsyncIndexerService {
@ObjectClassDefinition(
name = "Apache Jackrabbit Oak Async Indexer Service",
description = "Configures the async indexer services which performs periodic indexing of repository content"
)
@interface Configuration {
@AttributeDefinition(
cardinality = 1024,
name = "Async Indexer Configs",
description = "Async indexer configs in the form of <name>:<interval in secs> e.g. \"async:5\""
)
String[] asyncConfigs() default {"async:5"};
@AttributeDefinition(
name = "Lease time out",
description = "Lease timeout in minutes. AsyncIndexer would wait for this timeout period before breaking " +
"async indexer lease"
)
int leaseTimeOutMinutes() default 15;
@AttributeDefinition(
name = "Failing Index Timeout (s)",
description = "Time interval in seconds after which a failing index is considered as corrupted and " +
"ignored from further indexing untill reindex. To disable this set it to 0"
)
long failingIndexTimeoutSeconds() default 30 * 60;
@AttributeDefinition(
name = "Error warn interval (s)",
description = "Time interval in seconds after which a warning log would be logged for skipped indexes. " +
"This is done to avoid flooding the log in case of corrupted index."
)
long errorWarnIntervalSeconds() default 15 * 60;
}
private static final char CONFIG_SEP = ':';
private final Logger log = LoggerFactory.getLogger(getClass());
private final WhiteboardIndexEditorProvider indexEditorProvider = new WhiteboardIndexEditorProvider();
@Reference
private NodeStore nodeStore;
@Reference(target = "(type=" + ChangeCollectorProvider.TYPE + ")")
private ValidatorProvider validatorProvider;
@Reference
private StatisticsProvider statisticsProvider;
private IndexMBeanRegistration indexRegistration;
private final Closer closer = Closer.create();
private WhiteboardExecutor executor;
@Activate
public void activate(BundleContext bundleContext, Configuration config) {
List<AsyncConfig> asyncIndexerConfig = getAsyncConfig(config.asyncConfigs());
Whiteboard whiteboard = new OsgiWhiteboard(bundleContext);
indexRegistration = new IndexMBeanRegistration(whiteboard);
indexEditorProvider.start(whiteboard);
executor = new WhiteboardExecutor();
executor.start(whiteboard);
long leaseTimeOutMin = config.leaseTimeOutMinutes();
if (!(nodeStore instanceof Clusterable)){
leaseTimeOutMin = 0;
log.info("Detected non clusterable setup. Lease checking would be disabled for async indexing");
}
TrackingCorruptIndexHandler corruptIndexHandler = createCorruptIndexHandler(config);
for (AsyncConfig c : asyncIndexerConfig) {
AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, indexEditorProvider,
statisticsProvider, false);
task.setCorruptIndexHandler(corruptIndexHandler);
task.setValidatorProviders(Collections.singletonList(validatorProvider));
task.setLeaseTimeOut(TimeUnit.MINUTES.toMillis(leaseTimeOutMin));
indexRegistration.registerAsyncIndexer(task, c.timeIntervalInSecs);
closer.register(task);
}
registerAsyncReindexSupport(whiteboard);
log.info("Configured async indexers {} ", asyncIndexerConfig);
log.info("Lease time: {} mins and AsyncIndexUpdate configured with {}", leaseTimeOutMin, validatorProvider.getClass().getName());
}
private void registerAsyncReindexSupport(Whiteboard whiteboard) {
// async reindex
String name = IndexConstants.ASYNC_REINDEX_VALUE;
AsyncIndexUpdate task = new AsyncIndexUpdate(name, nodeStore, indexEditorProvider, statisticsProvider, true);
PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(task, executor);
final Registration reg = new CompositeRegistration(
registerMBean(whiteboard, PropertyIndexAsyncReindexMBean.class, asyncPI,
PropertyIndexAsyncReindexMBean.TYPE, "async"),
registerMBean(whiteboard, IndexStatsMBean.class, task.getIndexStats(), IndexStatsMBean.TYPE, name));
closer.register(new Closeable() {
@Override
public void close() throws IOException {
reg.unregister();
}
});
}
@Deactivate
public void deactivate() throws IOException {
if (indexRegistration != null) {
indexRegistration.unregister();
}
if (executor != null) {
executor.stop();
executor = null;
}
//Close the task *after* unregistering the jobs
closer.close();
}
//~-------------------------------------------< internal >
private TrackingCorruptIndexHandler createCorruptIndexHandler(Configuration config) {
long failingIndexTimeoutSeconds = config.failingIndexTimeoutSeconds();
long errorWarnIntervalSeconds = config.errorWarnIntervalSeconds();
TrackingCorruptIndexHandler corruptIndexHandler = new TrackingCorruptIndexHandler();
corruptIndexHandler.setCorruptInterval(failingIndexTimeoutSeconds, TimeUnit.SECONDS);
corruptIndexHandler.setErrorWarnInterval(errorWarnIntervalSeconds, TimeUnit.SECONDS);
corruptIndexHandler.setMeterStats(statisticsProvider.getMeter(TrackingCorruptIndexHandler.CORRUPT_INDEX_METER_NAME, StatsOptions.METRICS_ONLY));
if (failingIndexTimeoutSeconds <= 0){
log.info("[failingIndexTimeoutSeconds] is set to {}. Auto corrupt index isolation handling is disabled, warning log would be " +
"logged every {} s", failingIndexTimeoutSeconds, errorWarnIntervalSeconds);
} else {
log.info("Auto corrupt index isolation handling is enabled. Any async index which fails for {}s would " +
"be marked as corrupted and would be skipped from further indexing. A warning log would be " +
"logged every {} s", failingIndexTimeoutSeconds, errorWarnIntervalSeconds);
}
return corruptIndexHandler;
}
static List<AsyncConfig> getAsyncConfig(String[] configs) {
List<AsyncConfig> result = Lists.newArrayList();
for (String config : configs) {
int idOfEq = config.indexOf(CONFIG_SEP);
checkArgument(idOfEq > 0, "Invalid config provided [%s]", Arrays.toString(configs));
String name = config.substring(0, idOfEq).trim();
long interval = Long.parseLong(config.substring(idOfEq + 1));
result.add(new AsyncConfig(name, interval));
}
return result;
}
static class AsyncConfig {
final String name;
final long timeIntervalInSecs;
private AsyncConfig(String name, long timeIntervalInSecs) {
this.name = AsyncIndexUpdate.checkValidName(name);
this.timeIntervalInSecs = timeIntervalInSecs;
}
@Override
public String toString() {
return "AsyncConfig{" +
"name='" + name + '\'' +
", timeIntervalInSecs=" + timeIntervalInSecs +
'}';
}
}
}