blob: b8875ff3ad88de216d93b17c8c734bfb7766eb00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.plugins.document.secondary;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.plugins.document.AbstractDocumentNodeState;
import org.apache.jackrabbit.oak.plugins.document.NodeStateDiffer;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.spi.state.ApplyDiff;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.jackrabbit.oak.stats.StatsOptions;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SecondaryStoreObserver implements Observer {
private final Logger log = LoggerFactory.getLogger(getClass());
private final NodeStore nodeStore;
private final PathFilter pathFilter;
private final List<String> metaPropNames;
private final SecondaryStoreRootObserver secondaryObserver;
private final NodeStateDiffer differ;
private final TimerStats local;
private final TimerStats external;
private boolean firstEventProcessed;
public SecondaryStoreObserver(NodeStore nodeStore,
List<String> metaPropNames,
NodeStateDiffer differ,
PathFilter pathFilter,
StatisticsProvider statisticsProvider,
SecondaryStoreRootObserver secondaryObserver) {
this.nodeStore = nodeStore;
this.pathFilter = pathFilter;
this.secondaryObserver = secondaryObserver;
this.differ = differ;
this.metaPropNames = metaPropNames;
this.local = statisticsProvider.getTimer("DOCUMENT_CACHE_SEC_LOCAL", StatsOptions.DEFAULT);
this.external = statisticsProvider.getTimer("DOCUMENT_CACHE_SEC_EXTERNAL", StatsOptions.DEFAULT);
}
@Override
public void contentChanged(@NotNull NodeState root, @NotNull CommitInfo info) {
//Diff here would also be traversing non visible areas and there
//diffManyChildren might pose problem for e.g. data under uuid index
if (!firstEventProcessed){
log.info("Starting initial sync");
}
Stopwatch w = Stopwatch.createStarted();
AbstractDocumentNodeState target = (AbstractDocumentNodeState) root;
NodeState secondaryRoot = nodeStore.getRoot();
NodeState base = DelegatingDocumentNodeState.wrapIfPossible(secondaryRoot, differ);
NodeBuilder builder = secondaryRoot.builder();
ApplyDiff diff = new PathFilteringDiff(builder, pathFilter, metaPropNames, target);
//Copy the root node meta properties
PathFilteringDiff.copyMetaProperties(target, builder, metaPropNames);
//Apply the rest of properties
target.compareAgainstBaseState(base, diff);
try {
NodeState updatedSecondaryRoot = nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
secondaryObserver.contentChanged(DelegatingDocumentNodeState.wrap(updatedSecondaryRoot, differ));
TimerStats timer = info.isExternal() ? external : local;
timer.update(w.elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
if (!firstEventProcessed){
log.info("Time taken for initial sync {}", w);
firstEventProcessed = true;
}
} catch (CommitFailedException e) {
//TODO
log.warn("Commit to secondary store failed", e);
}
}
}