blob: 0956cd2f797a19baaa5e00ff52570ac7bec64fd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
import static org.apache.jackrabbit.oak.commons.IOUtils.closeQuietly;
import static org.apache.jackrabbit.oak.commons.PropertiesUtil.toLong;
import static org.apache.jackrabbit.oak.plugins.document.DocumentNodeStoreBuilder.DEFAULT_MEMORY_CACHE_SIZE;
import static org.apache.jackrabbit.oak.plugins.document.NodeDocument.MODIFIED_IN_SECS_RESOLUTION;
import static org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentNodeStoreBuilder.newMongoDocumentNodeStoreBuilder;
import static org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentNodeStoreBuilder.newRDBDocumentNodeStoreBuilder;
import static org.apache.jackrabbit.oak.spi.blob.osgi.SplitBlobStoreService.ONLY_STANDALONE_TARGET;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleWithFixedDelay;
import java.io.Closeable;
import java.io.IOException;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.sql.DataSource;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.base.Strings;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.mongodb.MongoClientURI;
import org.apache.commons.io.FilenameUtils;
import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.api.Descriptors;
import org.apache.jackrabbit.oak.api.jmx.CacheStatsMBean;
import org.apache.jackrabbit.oak.api.jmx.CheckpointMBean;
import org.apache.jackrabbit.oak.api.jmx.PersistentCacheStatsMBean;
import org.apache.jackrabbit.oak.cache.CacheStats;
import org.apache.jackrabbit.oak.plugins.document.VersionGarbageCollector.VersionGCStats;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentNodeStoreBuilder;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStoreMetrics;
import org.apache.jackrabbit.oak.plugins.document.rdb.RDBDocumentNodeStoreBuilder;
import org.apache.jackrabbit.oak.plugins.document.util.Utils;
import org.apache.jackrabbit.oak.spi.commit.ObserverTracker;
import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.blob.BlobGC;
import org.apache.jackrabbit.oak.plugins.blob.BlobGCMBean;
import org.apache.jackrabbit.oak.plugins.blob.BlobGarbageCollector;
import org.apache.jackrabbit.oak.plugins.blob.BlobStoreStats;
import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore;
import org.apache.jackrabbit.oak.plugins.blob.SharedDataStore;
import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobIdTracker;
import org.apache.jackrabbit.oak.plugins.blob.datastore.SharedDataStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.persistentCache.PersistentCacheStats;
import org.apache.jackrabbit.oak.plugins.document.util.MongoConnection;
import org.apache.jackrabbit.oak.plugins.document.util.SystemPropertySupplier;
import org.apache.jackrabbit.oak.spi.cluster.ClusterRepositoryInfo;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.blob.BlobStoreWrapper;
import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore;
import org.apache.jackrabbit.oak.spi.blob.stats.BlobStoreStatsMBean;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.gc.DelegatingGCMonitor;
import org.apache.jackrabbit.oak.spi.gc.GCMonitor;
import org.apache.jackrabbit.oak.spi.gc.GCMonitorTracker;
import org.apache.jackrabbit.oak.spi.gc.LoggingGCMonitor;
import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.NodeStoreProvider;
import org.apache.jackrabbit.oak.spi.state.RevisionGC;
import org.apache.jackrabbit.oak.spi.state.RevisionGCMBean;
import org.apache.jackrabbit.oak.spi.whiteboard.AbstractServiceTracker;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.spi.descriptors.GenericDescriptors;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleException;
import org.osgi.framework.Constants;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.ConfigurationPolicy;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The OSGi service to start/stop a DocumentNodeStore instance.
*/
@Component(
configurationPolicy = ConfigurationPolicy.REQUIRE,
configurationPid = {Configuration.PID})
public class DocumentNodeStoreService {
private static final long MB = 1024 * 1024;
static final String DEFAULT_URI = "mongodb://localhost:27017/oak";
static final int DEFAULT_CACHE = (int) (DEFAULT_MEMORY_CACHE_SIZE / MB);
static final int DEFAULT_BLOB_CACHE_SIZE = 16;
static final String DEFAULT_DB = "oak";
static final boolean DEFAULT_SO_KEEP_ALIVE = true;
static final String DEFAULT_PERSISTENT_CACHE = "cache";
static final String DEFAULT_JOURNAL_CACHE = "diff-cache";
static final boolean DEFAULT_CUSTOM_BLOB_STORE = false;
public static final String CONTINUOUS_RGC_EXPR = "*/5 * * * * ?";
public static final String CLASSIC_RGC_EXPR = "0 0 2 * * ?";
public static final long DEFAULT_RGC_TIME_LIMIT_SECS = 3*60*60; // default is 3 hours
public static final double DEFAULT_RGC_DELAY_FACTOR = 0;
private static final String DESCRIPTION = "oak.nodestore.description";
static final long DEFAULT_JOURNAL_GC_INTERVAL_MILLIS = 5*60*1000; // default is 5min
static final long DEFAULT_JOURNAL_GC_MAX_AGE_MILLIS = 24*60*60*1000; // default is 24hours
static final boolean DEFAULT_PREFETCH_EXTERNAL_CHANGES = false;
private static final String DEFAULT_PROP_HOME = "./repository";
static final long DEFAULT_MAX_REPLICATION_LAG = 6 * 60 * 60;
static final boolean DEFAULT_BUNDLING_DISABLED = false;
static final String DEFAULT_VER_GC_EXPRESSION = "";
/**
* Revisions older than this time would be garbage collected
*/
static final long DEFAULT_VER_GC_MAX_AGE = 24 * 60 * 60; //TimeUnit.DAYS.toSeconds(1);
/**
* Blob modified before this time duration would be considered for Blob GC
*/
static final long DEFAULT_BLOB_GC_MAX_AGE = 24 * 60 * 60;
/**
* Default interval for taking snapshots of locally tracked blob ids.
*/
static final long DEFAULT_BLOB_SNAPSHOT_INTERVAL = 12 * 60 * 60;
// property name constants - values can come from framework properties or OSGi config
public static final String CUSTOM_BLOB_STORE = "customBlobStore";
public static final String PROP_REV_RECOVERY_INTERVAL = "lastRevRecoveryJobIntervalInSecs";
public static final String PROP_DS_TYPE = "documentStoreType";
private enum DocumentStoreType {
MONGO, RDB;
static DocumentStoreType fromString(String type) {
if (type == null) {
return MONGO;
}
return valueOf(type.toUpperCase(Locale.ROOT));
}
}
private final Logger log = LoggerFactory.getLogger(this.getClass());
private ServiceRegistration nodeStoreReg;
private Closer closer;
private WhiteboardExecutor executor;
private volatile BlobStore blobStore;
private volatile DataSource dataSource;
private volatile DataSource blobDataSource;
private volatile DocumentNodeStateCache nodeStateCache;
private DocumentNodeStore nodeStore;
private ObserverTracker observerTracker;
private JournalPropertyHandlerFactory journalPropertyHandlerFactory = new JournalPropertyHandlerFactory();
private ComponentContext context;
private Whiteboard whiteboard;
private long deactivationTimestamp = 0;
@Reference
private StatisticsProvider statisticsProvider;
@Reference
private ConfigurationAdmin configurationAdmin;
@Reference(service = Preset.class)
private Preset preset;
private boolean customBlobStore;
private ServiceRegistration blobStoreReg;
private BlobStore defaultBlobStore;
private Configuration config;
@Activate
protected void activate(ComponentContext context, Configuration config) throws Exception {
closer = Closer.create();
this.config = DocumentNodeStoreServiceConfiguration.create(context,
configurationAdmin, preset.configuration, config);
this.context = context;
whiteboard = new OsgiWhiteboard(context.getBundleContext());
executor = new WhiteboardExecutor();
executor.start(whiteboard);
customBlobStore = this.config.customBlobStore();
documentStoreType = DocumentStoreType.fromString(this.config.documentStoreType());
registerNodeStoreIfPossible();
}
private void registerNodeStoreIfPossible() throws IOException {
// disallow attempts to restart (OAK-3420)
if (deactivationTimestamp != 0) {
log.info("DocumentNodeStore was already unregistered ({}ms ago)", System.currentTimeMillis() - deactivationTimestamp);
} else if (context == null) {
log.info("Component still not activated. Ignoring the initialization call");
} else if (customBlobStore && blobStore == null) {
log.info("Custom BlobStore use enabled. DocumentNodeStoreService would be initialized when "
+ "BlobStore would be available");
} else if (documentStoreType == DocumentStoreType.RDB && (dataSource == null || blobDataSource == null)) {
log.info("DataSource use enabled. DocumentNodeStoreService would be initialized when "
+ "DataSource would be available (currently available: nodes: {}, blobs: {})", dataSource, blobDataSource);
} else {
registerNodeStore();
}
}
private void registerNodeStore() throws IOException {
DocumentNodeStoreBuilder<?> mkBuilder;
if (documentStoreType == DocumentStoreType.RDB) {
RDBDocumentNodeStoreBuilder builder = newRDBDocumentNodeStoreBuilder();
configureBuilder(builder);
checkNotNull(dataSource, "DataStore type set [%s] but DataSource reference not initialized", PROP_DS_TYPE);
if (!customBlobStore) {
checkNotNull(blobDataSource, "DataStore type set [%s] but BlobDataSource reference not initialized", PROP_DS_TYPE);
builder.setRDBConnection(dataSource, blobDataSource);
log.info("Connected to datasources {} {}", dataSource, blobDataSource);
} else {
if (blobDataSource != null && blobDataSource != dataSource) {
log.info("Ignoring blobDataSource {} as custom blob store takes precedence.", blobDataSource);
}
builder.setRDBConnection(dataSource);
log.info("Connected to datasource {}", dataSource);
}
mkBuilder = builder;
} else {
String uri = config.mongouri();
String db = config.db();
boolean soKeepAlive = config.socketKeepAlive();
MongoClientURI mongoURI = new MongoClientURI(uri);
String persistentCache = resolvePath(config.persistentCache(), DEFAULT_PERSISTENT_CACHE);
String journalCache = resolvePath(config.journalCache(), DEFAULT_JOURNAL_CACHE);
if (log.isInfoEnabled()) {
// Take care around not logging the uri directly as it
// might contain passwords
log.info("Starting DocumentNodeStore with host={}, db={}, cache size (MB)={}, persistentCache={}, " +
"journalCache={}, blobCacheSize (MB)={}, maxReplicationLagInSecs={}",
mongoURI.getHosts(), db, config.cache(), persistentCache,
journalCache, config.blobCacheSize(), config.maxReplicationLagInSecs());
log.info("Mongo Connection details {}", MongoConnection.toString(mongoURI.getOptions()));
}
MongoDocumentNodeStoreBuilder builder = newMongoDocumentNodeStoreBuilder();
configureBuilder(builder);
builder.setMaxReplicationLag(config.maxReplicationLagInSecs(), TimeUnit.SECONDS);
builder.setSocketKeepAlive(soKeepAlive);
builder.setMongoDB(uri, db, config.blobCacheSize());
mkBuilder = builder;
log.info("Connected to database '{}'", db);
}
if (!customBlobStore){
defaultBlobStore = mkBuilder.getBlobStore();
log.info("Registering the BlobStore with ServiceRegistry");
blobStoreReg = context.getBundleContext().registerService(BlobStore.class.getName(),
defaultBlobStore , null);
}
//Set wrapping blob store after setting the DB
if (isWrappingCustomBlobStore()) {
((BlobStoreWrapper) blobStore).setBlobStore(mkBuilder.getBlobStore());
mkBuilder.setBlobStore(blobStore);
}
// attach GCMonitor
final GCMonitorTracker gcMonitor = new GCMonitorTracker();
gcMonitor.start(whiteboard);
closer.register(asCloseable(gcMonitor));
Logger vgcLogger = LoggerFactory.getLogger(VersionGarbageCollector.class);
GCMonitor loggingGCMonitor;
if (isContinuousRevisionGC()) {
// log less chatty with continuous RevisionGC
loggingGCMonitor = new QuietGCMonitor(vgcLogger);
} else {
loggingGCMonitor = new LoggingGCMonitor(vgcLogger);
}
mkBuilder.setGCMonitor(new DelegatingGCMonitor(
newArrayList(gcMonitor, loggingGCMonitor)));
nodeStore = mkBuilder.build();
// ensure a clusterId is initialized
// and expose it as 'oak.clusterid' repository descriptor
GenericDescriptors clusterIdDesc = new GenericDescriptors();
clusterIdDesc.put(ClusterRepositoryInfo.OAK_CLUSTERID_REPOSITORY_DESCRIPTOR_KEY,
new SimpleValueFactory().createValue(
ClusterRepositoryInfo.getOrCreateId(nodeStore)), true, false);
whiteboard.register(Descriptors.class, clusterIdDesc, Collections.emptyMap());
// If a shared data store register the repo id in the data store
if (SharedDataStoreUtils.isShared(blobStore)) {
String repoId = null;
try {
repoId = ClusterRepositoryInfo.getOrCreateId(nodeStore);
((SharedDataStore) blobStore).setRepositoryId(repoId);
} catch (Exception e) {
throw new IOException("Could not register a unique repositoryId", e);
}
if (blobStore instanceof BlobTrackingStore) {
BlobTrackingStore trackingStore = (BlobTrackingStore) blobStore;
if (trackingStore.getTracker() != null) {
trackingStore.getTracker().close();
}
((BlobTrackingStore) blobStore).addTracker(
BlobIdTracker.build(getRepositoryHome(), repoId,
config.blobTrackSnapshotIntervalInSecs(),
(SharedDataStore) blobStore));
}
}
registerJMXBeans(nodeStore, mkBuilder);
registerLastRevRecoveryJob(nodeStore);
registerJournalGC(nodeStore);
registerVersionGCJob(nodeStore);
registerDocumentStoreMetrics(mkBuilder.getDocumentStore());
if (!isNodeStoreProvider()) {
observerTracker = new ObserverTracker(nodeStore);
observerTracker.start(context.getBundleContext());
}
journalPropertyHandlerFactory.start(whiteboard);
DocumentStore ds = nodeStore.getDocumentStore();
// OAK-2682: time difference detection applied at startup with a default
// max time diff of 2000 millis (2sec)
final long maxDiff = SystemPropertySupplier.create("oak.documentMK.maxServerTimeDiffMillis", 2000L).loggingTo(log).get();
try {
if (maxDiff>=0) {
final long timeDiff = ds.determineServerTimeDifferenceMillis();
log.info("registerNodeStore: server time difference: {}ms (max allowed: {}ms)", timeDiff, maxDiff);
if (Math.abs(timeDiff) > maxDiff) {
throw new AssertionError("Server clock seems off (" + timeDiff + "ms) by more than configured amount ("
+ maxDiff + "ms)");
}
}
} catch (RuntimeException e) { // no checked exception
// in case of a RuntimeException, just log but continue
log.warn("registerNodeStore: got RuntimeException while trying to determine time difference to server: " + e, e);
}
String[] serviceClasses;
if (isNodeStoreProvider()) {
registerNodeStoreProvider(nodeStore);
serviceClasses = new String[]{
DocumentNodeStore.class.getName(),
Clusterable.class.getName()
};
} else {
serviceClasses = new String[]{
NodeStore.class.getName(),
DocumentNodeStore.class.getName(),
Clusterable.class.getName()
};
}
Dictionary<String, Object> props = new Hashtable<String, Object>();
props.put(Constants.SERVICE_PID, DocumentNodeStore.class.getName());
props.put(DESCRIPTION, getMetadata(ds));
// OAK-2844: in order to allow DocumentDiscoveryLiteService to directly
// require a service DocumentNodeStore (instead of having to do an 'instanceof')
// the registration is now done for both NodeStore and DocumentNodeStore here.
nodeStoreReg = context.getBundleContext().registerService(
serviceClasses,
nodeStore, props);
}
private void configureBuilder(DocumentNodeStoreBuilder<?> builder) {
String persistentCache = resolvePath(config.persistentCache(), DEFAULT_PERSISTENT_CACHE);
String journalCache = resolvePath(config.journalCache(), DEFAULT_JOURNAL_CACHE);
builder.setStatisticsProvider(statisticsProvider).
setExecutor(executor).
memoryCacheSize(config.cache() * MB).
memoryCacheDistribution(
config.nodeCachePercentage(),
config.prevDocCachePercentage(),
config.childrenCachePercentage(),
config.diffCachePercentage()).
setCacheSegmentCount(config.cacheSegmentCount()).
setCacheStackMoveDistance(config.cacheStackMoveDistance()).
setBundlingDisabled(config.bundlingDisabled()).
setJournalPropertyHandlerFactory(journalPropertyHandlerFactory).
setLeaseCheckMode(ClusterNodeInfo.DEFAULT_LEASE_CHECK_DISABLED ? LeaseCheckMode.DISABLED : LeaseCheckMode.valueOf(config.leaseCheckMode())).
setLeaseFailureHandler(new LeaseFailureHandler() {
@Override
public void handleLeaseFailure() {
Bundle bundle = context.getBundleContext().getBundle();
String bundleName = bundle.getSymbolicName();
try {
// plan A: try stopping oak-store-document
log.error("handleLeaseFailure: stopping {}...", bundleName);
bundle.stop(Bundle.STOP_TRANSIENT);
log.error("handleLeaseFailure: stopped {}.", bundleName);
// plan A worked, perfect!
} catch (BundleException e) {
log.error("handleLeaseFailure: exception while stopping " + bundleName + ": " + e, e);
// plan B: stop only DocumentNodeStoreService (to stop the background threads)
log.error("handleLeaseFailure: stopping DocumentNodeStoreService...");
context.disableComponent(DocumentNodeStoreService.class.getName());
log.error("handleLeaseFailure: stopped DocumentNodeStoreService");
// plan B succeeded.
}
}
}).
setPrefetchExternalChanges(config.prefetchExternalChanges()).
setUpdateLimit(config.updateLimit()).
setJournalGCMaxAge(config.journalGCMaxAge()).
setNodeCachePathPredicate(createCachePredicate());
if (!Strings.isNullOrEmpty(persistentCache)) {
builder.setPersistentCache(persistentCache);
}
if (!Strings.isNullOrEmpty(journalCache)) {
builder.setJournalCache(journalCache);
}
//Set blobstore before setting the document store
if (customBlobStore && !isWrappingCustomBlobStore()) {
checkNotNull(blobStore, "Use of custom BlobStore enabled via [%s] but blobStore reference not " +
"initialized", CUSTOM_BLOB_STORE);
builder.setBlobStore(blobStore);
}
}
private boolean isWrappingCustomBlobStore() {
return customBlobStore && blobStore instanceof BlobStoreWrapper;
}
private Predicate<Path> createCachePredicate() {
if (config.persistentCacheIncludes().length == 0) {
return Predicates.alwaysTrue();
}
if (Arrays.equals(config.persistentCacheIncludes(), new String[]{"/"})) {
return Predicates.alwaysTrue();
}
Set<Path> paths = new HashSet<>();
for (String p : config.persistentCacheIncludes()) {
p = p != null ? Strings.emptyToNull(p.trim()) : null;
if (p != null) {
paths.add(Path.fromString(p));
}
}
log.info("Configuring persistent cache to only cache nodes under paths {}", paths);
return input -> {
if (input != null) {
for (Path p : paths) {
if (p.isAncestorOf(input)) {
return true;
}
}
}
return false;
};
}
private boolean isNodeStoreProvider() {
return !Strings.isNullOrEmpty(config.role());
}
private boolean isContinuousRevisionGC() {
String expr = getVersionGCExpression();
String[] elements = expr.split("\\s");
// simple heuristic to determine if revision GC runs 'frequently'
return elements.length >= 6 && elements[1].equals("*");
}
private String getVersionGCExpression() {
String defaultExpr = CONTINUOUS_RGC_EXPR;
String expr = config.versionGCExpression();
if (Strings.isNullOrEmpty(expr)) {
expr = defaultExpr;
}
// validate expression
try {
if (!expr.isEmpty()) {
new CronExpression(expr);
}
} catch (ParseException e) {
log.warn("Invalid cron expression, falling back to default '" + defaultExpr + "'", e);
expr = defaultExpr;
}
return expr;
}
private void registerNodeStoreProvider(final NodeStore ns) {
Dictionary<String, Object> props = new Hashtable<>();
props.put(NodeStoreProvider.ROLE, config.role());
nodeStoreReg = context.getBundleContext().registerService(
NodeStoreProvider.class.getName(),
(NodeStoreProvider) () -> ns,
props);
log.info("Registered NodeStoreProvider backed by DocumentNodeStore");
}
@Deactivate
protected void deactivate() {
if (observerTracker != null) {
observerTracker.stop();
}
if (journalPropertyHandlerFactory != null){
journalPropertyHandlerFactory.stop();
}
unregisterNodeStore();
}
@Reference(name = "blobStore",
cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
target = ONLY_STANDALONE_TARGET
)
@SuppressWarnings("UnusedDeclaration")
protected void bindBlobStore(BlobStore blobStore) throws IOException {
if (defaultBlobStore == blobStore){
return;
}
log.info("Initializing DocumentNodeStore with BlobStore [{}]", blobStore);
this.blobStore = blobStore;
registerNodeStoreIfPossible();
}
@SuppressWarnings("UnusedDeclaration")
protected void unbindBlobStore(BlobStore blobStore) {
if (defaultBlobStore == blobStore){
return;
}
this.blobStore = null;
unregisterNodeStore();
}
@Reference(name = "dataSource",
cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
target = "(datasource.name=oak)"
)
@SuppressWarnings("UnusedDeclaration")
protected void bindDataSource(DataSource dataSource) throws IOException {
if (this.dataSource != null) {
log.info("Ignoring bindDataSource [{}] because dataSource [{}] is already bound", dataSource, this.dataSource);
} else {
log.info("Initializing DocumentNodeStore with dataSource [{}]", dataSource);
this.dataSource = dataSource;
registerNodeStoreIfPossible();
}
}
@SuppressWarnings("UnusedDeclaration")
protected void unbindDataSource(DataSource dataSource) {
if (this.dataSource != dataSource) {
log.info("Ignoring unbindDataSource [{}] because dataSource is bound to [{}]", dataSource, this.dataSource);
} else {
log.info("Unregistering DocumentNodeStore because dataSource [{}] was unbound", dataSource);
this.dataSource = null;
unregisterNodeStore();
}
}
private DocumentStoreType documentStoreType;
@Reference(name = "blobDataSource",
cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
target = "(datasource.name=oak)"
)
@SuppressWarnings("UnusedDeclaration")
protected void bindBlobDataSource(DataSource dataSource) throws IOException {
if (this.blobDataSource != null) {
log.info("Ignoring bindBlobDataSource [{}] because blobDataSource [{}] is already bound", dataSource,
this.blobDataSource);
} else {
log.info("Initializing DocumentNodeStore with blobDataSource [{}]", dataSource);
this.blobDataSource = dataSource;
registerNodeStoreIfPossible();
}
}
@SuppressWarnings("UnusedDeclaration")
protected void unbindBlobDataSource(DataSource dataSource) {
if (this.blobDataSource != dataSource) {
log.info("Ignoring unbindBlobDataSource [{}] because dataSource is bound to [{}]", dataSource, this.blobDataSource);
} else {
log.info("Unregistering DocumentNodeStore because blobDataSource [{}] was unbound", dataSource);
this.blobDataSource = null;
unregisterNodeStore();
}
}
@Reference(name = "nodeStateCache",
cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC
)
@SuppressWarnings("UnusedDeclaration")
protected void bindNodeStateCache(DocumentNodeStateCache nodeStateCache) throws IOException {
if (nodeStore != null){
log.info("Registered DocumentNodeStateCache [{}] with DocumentNodeStore", nodeStateCache);
nodeStore.setNodeStateCache(nodeStateCache);
}
}
@SuppressWarnings("UnusedDeclaration")
protected void unbindNodeStateCache(DocumentNodeStateCache nodeStateCache) {
if (nodeStore != null){
nodeStore.setNodeStateCache(DocumentNodeStateCache.NOOP);
}
}
private void unregisterNodeStore() {
deactivationTimestamp = System.currentTimeMillis();
closeQuietly(closer);
if (nodeStoreReg != null) {
nodeStoreReg.unregister();
nodeStoreReg = null;
}
//If we exposed our BlobStore then unregister it *after*
//NodeStore service. This ensures that if any other component
//like SecondaryStoreCache depends on this then it remains active
//untill DocumentNodeStore get deactivated
if (blobStoreReg != null){
blobStoreReg.unregister();
blobStoreReg = null;
}
if (nodeStore != null) {
nodeStore.dispose();
nodeStore = null;
}
if (executor != null) {
executor.stop();
executor = null;
}
}
private void registerJMXBeans(final DocumentNodeStore store, DocumentNodeStoreBuilder<?> mkBuilder) throws
IOException {
registerCacheStatsMBean(store.getNodeCacheStats());
registerCacheStatsMBean(store.getNodeChildrenCacheStats());
for (CacheStats cs : store.getDiffCacheStats()) {
registerCacheStatsMBean(cs);
}
DocumentStore ds = store.getDocumentStore();
if (ds.getCacheStats() != null) {
for (CacheStats cacheStats : ds.getCacheStats()) {
registerCacheStatsMBean(cacheStats);
}
}
if (!isNodeStoreProvider()) {
addRegistration(
registerMBean(whiteboard,
CheckpointMBean.class,
new DocumentCheckpointMBean(store),
CheckpointMBean.TYPE,
"Document node store checkpoint management")
);
}
addRegistration(
registerMBean(whiteboard,
DocumentNodeStoreMBean.class,
store.getMBean(),
DocumentNodeStoreMBean.TYPE,
"Document node store management")
);
if (mkBuilder.getBlobStoreCacheStats() != null) {
registerCacheStatsMBean(mkBuilder.getBlobStoreCacheStats());
}
if (mkBuilder.getDocumentStoreStatsCollector() instanceof DocumentStoreStatsMBean) {
addRegistration(
registerMBean(whiteboard,
DocumentStoreStatsMBean.class,
(DocumentStoreStatsMBean) mkBuilder.getDocumentStoreStatsCollector(),
DocumentStoreStatsMBean.TYPE,
"DocumentStore Statistics")
);
}
// register persistent cache stats
Map<String, PersistentCacheStats> persistenceCacheStats = mkBuilder.getPersistenceCacheStats();
for (PersistentCacheStats pcs: persistenceCacheStats.values()) {
addRegistration(
registerMBean(whiteboard,
PersistentCacheStatsMBean.class,
pcs,
PersistentCacheStatsMBean.TYPE,
pcs.getName())
);
}
final long versionGcMaxAgeInSecs = config.versionGcMaxAgeInSecs();
final long blobGcMaxAgeInSecs = config.blobGcMaxAgeInSecs();
if (store.getBlobStore() instanceof GarbageCollectableBlobStore) {
BlobGarbageCollector gc = store.createBlobGarbageCollector(blobGcMaxAgeInSecs,
ClusterRepositoryInfo.getOrCreateId(nodeStore),
whiteboard, statisticsProvider);
addRegistration(registerMBean(whiteboard, BlobGCMBean.class, new BlobGC(gc, executor),
BlobGCMBean.TYPE, "Document node store blob garbage collection"));
}
Runnable startGC = new RevisionGCJob(store, versionGcMaxAgeInSecs, 0, DEFAULT_RGC_DELAY_FACTOR);
Runnable cancelGC = () -> store.getVersionGarbageCollector().cancel();
Supplier<String> status = () -> store.getVersionGarbageCollector().getStatus();
RevisionGC revisionGC = new RevisionGC(startGC, cancelGC, status, executor);
addRegistration(registerMBean(whiteboard, RevisionGCMBean.class, revisionGC,
RevisionGCMBean.TYPE, "Document node store revision garbage collection"));
addRegistration(registerMBean(whiteboard, RevisionGCStatsMBean.class,
store.getVersionGarbageCollector().getRevisionGCStats(),
RevisionGCStatsMBean.TYPE,
"Document node store revision garbage collection statistics"));
BlobStoreStats blobStoreStats = mkBuilder.getBlobStoreStats();
if (!customBlobStore && blobStoreStats != null) {
addRegistration(registerMBean(whiteboard,
BlobStoreStatsMBean.class,
blobStoreStats,
BlobStoreStatsMBean.TYPE,
ds.getClass().getSimpleName()));
}
if (!mkBuilder.isBundlingDisabled()){
addRegistration(registerMBean(whiteboard,
BackgroundObserverMBean.class,
store.getBundlingConfigHandler().getMBean(),
BackgroundObserverMBean.TYPE,
"BundlingConfigObserver"));
}
}
private void registerCacheStatsMBean(CacheStats cacheStats) {
addRegistration(registerMBean(whiteboard, CacheStatsMBean.class,
cacheStats, CacheStatsMBean.TYPE, cacheStats.getName()));
}
private void registerLastRevRecoveryJob(final DocumentNodeStore nodeStore) {
long leaseTime = toLong(context.getProperties().get(PROP_REV_RECOVERY_INTERVAL),
ClusterNodeInfo.DEFAULT_LEASE_UPDATE_INTERVAL_MILLIS);
addRegistration(scheduleWithFixedDelay(whiteboard,
new LastRevRecoveryJob(nodeStore), TimeUnit.MILLISECONDS.toSeconds(leaseTime),
false/*runOnSingleClusterNode*/, true /*use dedicated pool*/));
}
private void registerJournalGC(final DocumentNodeStore nodeStore) {
addRegistration(scheduleWithFixedDelay(whiteboard,
new JournalGCJob(nodeStore),
jobPropertiesFor(JournalGCJob.class),
TimeUnit.MILLISECONDS.toSeconds(config.journalGCInterval()),
true/*runOnSingleClusterNode*/, true /*use dedicated pool*/));
}
private void registerVersionGCJob(final DocumentNodeStore nodeStore) {
String expr = getVersionGCExpression();
if (expr.isEmpty()) {
return;
}
Map<String, Object> props = jobPropertiesFor(RevisionGCJob.class);
props.put("scheduler.expression", expr);
long versionGcMaxAgeInSecs = config.versionGcMaxAgeInSecs();
long versionGCTimeLimitInSecs = config.versionGCTimeLimitInSecs();
double versionGCDelayFactor = config.versionGCDelayFactor();
addRegistration(scheduleWithFixedDelay(whiteboard,
new RevisionGCJob(nodeStore, versionGcMaxAgeInSecs,
versionGCTimeLimitInSecs, versionGCDelayFactor),
props, MODIFIED_IN_SECS_RESOLUTION, true, true));
}
private void registerDocumentStoreMetrics(DocumentStore store) {
if (store instanceof MongoDocumentStore) {
addRegistration(scheduleWithFixedDelay(whiteboard,
new MongoDocumentStoreMetrics((MongoDocumentStore) store, statisticsProvider),
jobPropertiesFor(MongoDocumentStoreMetrics.class),
TimeUnit.MINUTES.toSeconds(1), false, true));
}
}
private String resolvePath(String value, String defaultValue) {
String path = value;
if (Strings.isNullOrEmpty(value)) {
path = defaultValue;
}
if ("-".equals(path)) {
// disable this path configuration
return "";
}
// resolve as relative to repository.home
return FilenameUtils.concat(getRepositoryHome(), path);
}
private String getRepositoryHome() {
String repoHome = config.repository_home();
if (Strings.isNullOrEmpty(repoHome)) {
repoHome = DEFAULT_PROP_HOME;
}
return repoHome;
}
private static String[] getMetadata(DocumentStore ds) {
Map<String, String> meta = new HashMap<String, String>(ds.getMetadata());
meta.put("nodeStoreType", "document");
String[] result = new String[meta.size()];
int i = 0;
for (Map.Entry<String, String> e : meta.entrySet()) {
result[i++] = e.getKey() + "=" + e.getValue();
}
return result;
}
private void addRegistration(@NotNull Registration reg) {
closer.register(asCloseable(reg));
}
private static Closeable asCloseable(@NotNull final Registration reg) {
checkNotNull(reg);
return new Closeable() {
@Override
public void close() throws IOException {
reg.unregister();
}
};
}
private static Closeable asCloseable(@NotNull final AbstractServiceTracker t) {
checkNotNull(t);
return new Closeable() {
@Override
public void close() throws IOException {
t.stop();
}
};
}
static final class RevisionGCJob implements Runnable, Supplier<String> {
// log as VersionGarbageCollector
private static final Logger LOGGER = LoggerFactory.getLogger(VersionGarbageCollector.class);
// log every hour
private static final long LOG_INTERVAL = TimeUnit.HOURS.toMillis(1);
private final DocumentNodeStore nodeStore;
private final long versionGCMaxAgeInSecs;
private final long versionGCTimeLimitInSecs;
private final double versionGCDelayFactor;
private volatile Object lastResult = "";
private long lastLogTime;
private VersionGCStats stats;
RevisionGCJob(DocumentNodeStore ns,
long versionGcMaxAgeInSecs,
long versionGCTimeLimitInSecs,
double versionGCDelayFactor) {
this.nodeStore = ns;
this.versionGCMaxAgeInSecs = versionGcMaxAgeInSecs;
this.versionGCTimeLimitInSecs = versionGCTimeLimitInSecs;
this.versionGCDelayFactor = versionGCDelayFactor;
resetStats();
}
@Override
public void run() {
VersionGarbageCollector gc = nodeStore.getVersionGarbageCollector();
gc.setOptions(gc.getOptions().withMaxDuration(TimeUnit.SECONDS, versionGCTimeLimitInSecs).withDelayFactor(versionGCDelayFactor));
try {
VersionGCStats s = gc.gc(versionGCMaxAgeInSecs, TimeUnit.SECONDS);
stats.addRun(s);
lastResult = s.toString();
} catch (Exception e) {
lastResult = e;
LOGGER.warn("Error occurred while executing the Version Garbage Collector", e);
} finally {
maybeLogStats();
}
}
/**
* Returns the result of the last revision GC run. This method throws
* an {@link UncheckedExecutionException} if the last run failed with an
* exception.
*
* @return result of the last revision GC run.
*/
@Override
public String get() throws UncheckedExecutionException {
if (lastResult instanceof Exception) {
throw new UncheckedExecutionException((Exception) lastResult);
}
return String.valueOf(lastResult);
}
private void resetStats() {
lastLogTime = nodeStore.getClock().getTime();
stats = new VersionGCStats();
}
private void maybeLogStats() {
if (nodeStore.getClock().getTime() > lastLogTime + LOG_INTERVAL) {
LOGGER.info("Garbage collector stats since {}: {}",
Utils.timestampToString(lastLogTime), stats);
resetStats();
}
}
}
private static final class JournalGCJob implements Runnable {
private final DocumentNodeStore nodeStore;
JournalGCJob(DocumentNodeStore ns) {
this.nodeStore = ns;
}
@Override
public void run() {
nodeStore.getJournalGarbageCollector().gc();
}
}
private static final class LastRevRecoveryJob implements Runnable {
private final DocumentNodeStore nodeStore;
LastRevRecoveryJob(DocumentNodeStore ns) {
this.nodeStore = ns;
}
@Override
public void run() {
nodeStore.getLastRevRecoveryAgent().performRecoveryIfNeeded();
}
}
private static Map<String, Object> jobPropertiesFor(Class clazz) {
Map<String, Object> props = new HashMap<>();
props.put("scheduler.name", clazz.getName());
return props;
}
@Component(
service = Preset.class,
configurationPid = Configuration.PRESET_PID)
public static class Preset {
Configuration configuration;
@Activate
void activate(Configuration configuration) {
this.configuration = configuration;
}
}
}