blob: efc9490f4465a906926f4e94c809e2dbd1839137 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.jackrabbit.guava.common.collect.Iterables;
import org.apache.jackrabbit.oak.commons.Compression;
import org.apache.jackrabbit.oak.index.IndexHelper;
import org.apache.jackrabbit.oak.index.IndexerSupport;
import org.apache.jackrabbit.oak.index.indexer.document.CompositeException;
import org.apache.jackrabbit.oak.index.indexer.document.NodeStateEntryTraverserFactory;
import org.apache.jackrabbit.oak.index.indexer.document.flatfile.pipelined.PipelinedStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreSortStrategy;
import org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils;
import org.apache.jackrabbit.oak.plugins.document.DocumentNodeStore;
import org.apache.jackrabbit.oak.plugins.document.RevisionVector;
import org.apache.jackrabbit.oak.plugins.document.mongo.MongoDocumentStore;
import org.apache.jackrabbit.oak.plugins.index.search.IndexDefinition;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.query.NodeStateNodeTypeInfoProvider;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.filter.PathFilter;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableSet;
import static org.apache.jackrabbit.guava.common.base.Preconditions.checkState;
import static org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.OAK_INDEXER_USE_LZ4;
import static org.apache.jackrabbit.oak.index.indexer.document.indexstore.IndexStoreUtils.OAK_INDEXER_USE_ZIP;
/**
* This class is where the strategy being selected for building FlatFileStore.
*/
public class FlatFileNodeStoreBuilder {
private static final String FLAT_FILE_STORE_DIR_NAME_PREFIX = "flat-fs-";
/**
* System property name for sort strategy. If this is true, we use {@link MultithreadedTraverseWithSortStrategy}, else
* {@link StoreAndSortStrategy} strategy is used.
* NOTE - System property {@link #OAK_INDEXER_SORT_STRATEGY_TYPE} takes precedence over this one.
*/
public static final String OAK_INDEXER_TRAVERSE_WITH_SORT = "oak.indexer.traverseWithSortStrategy";
/**
* System property name for sort strategy. This takes precedence over {@link #OAK_INDEXER_TRAVERSE_WITH_SORT}.
* Allowed values are the values from enum {@link SortStrategyType}
*/
public static final String OAK_INDEXER_SORT_STRATEGY_TYPE = "oak.indexer.sortStrategyType";
/**
* System property to define the existing folder containing the flat file store files
*/
public static final String OAK_INDEXER_SORTED_FILE_PATH = "oak.indexer.sortedFilePath";
/**
* Default value for {@link #PROP_THREAD_POOL_SIZE}
*/
static final int DEFAULT_NUMBER_OF_DATA_DUMP_THREADS = 8;
/**
* System property for specifying number of threads for parallel download when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_THREAD_POOL_SIZE = "oak.indexer.dataDumpThreadPoolSize";
/**
* Default value for {@link #PROP_MERGE_THREAD_POOL_SIZE}
*/
static final int DEFAULT_NUMBER_OF_MERGE_TASK_THREADS = 1;
/**
* System property for specifying number of threads for parallel merge when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_MERGE_THREAD_POOL_SIZE = "oak.indexer.mergeTaskThreadPoolSize";
/**
* Default value for {@link #PROP_MERGE_TASK_BATCH_SIZE}
*/
static final int DEFAULT_NUMBER_OF_FILES_PER_MERGE_TASK = 64;
/**
* System property for specifying number of files for batch merge task when using {@link MultithreadedTraverseWithSortStrategy}
*/
static final String PROP_MERGE_TASK_BATCH_SIZE = "oak.indexer.mergeTaskBatchSize";
/**
* Value of this system property indicates max memory that should be used if jmx based memory monitoring is not available.
*/
public static final String OAK_INDEXER_MAX_SORT_MEMORY_IN_GB = "oak.indexer.maxSortMemoryInGB";
public static final int OAK_INDEXER_MAX_SORT_MEMORY_IN_GB_DEFAULT = 2;
static final String OAK_INDEXER_DUMP_THRESHOLD_IN_MB = "oak.indexer.dumpThresholdInMB";
static final int OAK_INDEXER_DUMP_THRESHOLD_IN_MB_DEFAULT = 16;
private final Logger log = LoggerFactory.getLogger(getClass());
private List<Long> lastModifiedBreakPoints;
private final File workDir;
private final List<File> existingDataDumpDirs = new ArrayList<>();
private Set<String> preferredPathElements = Collections.emptySet();
private BlobStore blobStore;
private NodeStateEntryWriter entryWriter;
private NodeStateEntryTraverserFactory nodeStateEntryTraverserFactory;
private long entryCount = 0;
private File flatFileStoreDir;
private final MemoryManager memoryManager;
private long dumpThreshold = Integer.getInteger(OAK_INDEXER_DUMP_THRESHOLD_IN_MB, OAK_INDEXER_DUMP_THRESHOLD_IN_MB_DEFAULT) * FileUtils.ONE_MB;
private Predicate<String> pathPredicate = path -> true;
private final Compression algorithm = IndexStoreUtils.compressionAlgorithm();
private final boolean useTraverseWithSort = Boolean.parseBoolean(System.getProperty(OAK_INDEXER_TRAVERSE_WITH_SORT, "false"));
private final String sortStrategyTypeString = System.getProperty(OAK_INDEXER_SORT_STRATEGY_TYPE);
private final SortStrategyType sortStrategyType = sortStrategyTypeString != null ? SortStrategyType.valueOf(sortStrategyTypeString) :
(useTraverseWithSort ? SortStrategyType.TRAVERSE_WITH_SORT : SortStrategyType.PIPELINED);
private RevisionVector rootRevision = null;
private DocumentNodeStore nodeStore = null;
private MongoDocumentStore mongoDocumentStore = null;
private MongoDatabase mongoDatabase = null;
private Set<IndexDefinition> indexDefinitions = null;
private String checkpoint;
public enum SortStrategyType {
/**
* System property {@link #OAK_INDEXER_SORT_STRATEGY_TYPE} if set to this value would result in {@link StoreAndSortStrategy} being used.
*/
STORE_AND_SORT,
/**
* System property {@link #OAK_INDEXER_SORT_STRATEGY_TYPE} if set to this value would result in {@link TraverseWithSortStrategy} being used.
*/
TRAVERSE_WITH_SORT,
/**
* System property {@link #OAK_INDEXER_SORT_STRATEGY_TYPE} if set to this value would result in {@link MultithreadedTraverseWithSortStrategy} being used.
*/
MULTITHREADED_TRAVERSE_WITH_SORT,
/**
* System property {@link #OAK_INDEXER_SORT_STRATEGY_TYPE} if set to this value would result in {@link PipelinedStrategy} being used.
*/
PIPELINED
}
public FlatFileNodeStoreBuilder(File workDir, MemoryManager memoryManager) {
this.workDir = workDir;
this.memoryManager = memoryManager;
}
public FlatFileNodeStoreBuilder(File workDir) {
this.workDir = workDir;
this.memoryManager = new DefaultMemoryManager();
}
public FlatFileNodeStoreBuilder withLastModifiedBreakPoints(List<Long> lastModifiedBreakPoints) {
this.lastModifiedBreakPoints = lastModifiedBreakPoints;
return this;
}
public FlatFileNodeStoreBuilder withBlobStore(BlobStore blobStore) {
this.blobStore = blobStore;
return this;
}
public FlatFileNodeStoreBuilder withDumpThreshold(long dumpThreshold) {
this.dumpThreshold = dumpThreshold;
return this;
}
public FlatFileNodeStoreBuilder withPreferredPathElements(Set<String> preferredPathElements) {
this.preferredPathElements = preferredPathElements;
return this;
}
public FlatFileNodeStoreBuilder addExistingDataDumpDir(File existingDataDumpDir) {
if (existingDataDumpDir != null) {
this.existingDataDumpDirs.add(existingDataDumpDir);
}
return this;
}
public FlatFileNodeStoreBuilder withNodeStateEntryTraverserFactory(NodeStateEntryTraverserFactory factory) {
this.nodeStateEntryTraverserFactory = factory;
return this;
}
public FlatFileNodeStoreBuilder withPathPredicate(Predicate<String> pathPredicate) {
this.pathPredicate = pathPredicate;
return this;
}
public FlatFileNodeStoreBuilder withIndexDefinitions(Set<IndexDefinition> indexDefinitions) {
this.indexDefinitions = indexDefinitions;
return this;
}
public FlatFileNodeStoreBuilder withRootRevision(RevisionVector rootRevision) {
this.rootRevision = rootRevision;
return this;
}
public FlatFileNodeStoreBuilder withNodeStore(DocumentNodeStore nodeStore) {
this.nodeStore = nodeStore;
return this;
}
public FlatFileNodeStoreBuilder withMongoDocumentStore(MongoDocumentStore mongoDocumentStore) {
this.mongoDocumentStore = mongoDocumentStore;
return this;
}
public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) {
this.checkpoint = checkpoint;
return this;
}
public FlatFileNodeStoreBuilder withMongoDatabase(MongoDatabase mongoDatabase) {
this.mongoDatabase = mongoDatabase;
return this;
}
public FlatFileStore build() throws IOException, CompositeException {
logFlags();
entryWriter = new NodeStateEntryWriter(blobStore);
IndexStoreFiles indexStoreFiles = createdSortedStoreFiles();
File metadataFile = indexStoreFiles.metadataFile;
FlatFileStore store = new FlatFileStore(blobStore, indexStoreFiles.storeFiles.get(0), metadataFile,
new NodeStateEntryReader(blobStore),
unmodifiableSet(preferredPathElements), algorithm);
if (entryCount > 0) {
store.setEntryCount(entryCount);
}
return store;
}
public List<FlatFileStore> buildList(IndexHelper indexHelper, IndexerSupport indexerSupport,
Set<IndexDefinition> indexDefinitions) throws IOException, CompositeException {
logFlags();
entryWriter = new NodeStateEntryWriter(blobStore);
IndexStoreFiles indexStoreFiles = createdSortedStoreFiles();
List<File> fileList = indexStoreFiles.storeFiles;
File metadataFile = indexStoreFiles.metadataFile;
long start = System.currentTimeMillis();
// If not already split, split otherwise skip splitting
if (!fileList.stream().allMatch(FlatFileSplitter.IS_SPLIT)) {
NodeStore nodeStore = new MemoryNodeStore(indexerSupport.retrieveNodeStateForCheckpoint());
FlatFileSplitter splitter = new FlatFileSplitter(fileList.get(0), indexHelper.getWorkDir(),
new NodeStateNodeTypeInfoProvider(nodeStore.getRoot()), new NodeStateEntryReader(blobStore),
indexDefinitions);
fileList = splitter.split();
log.info("Split flat file to result files '{}' is done, took {} ms", fileList, System.currentTimeMillis() - start);
}
List<FlatFileStore> storeList = new ArrayList<>();
for (File flatFileItem : fileList) {
FlatFileStore store = new FlatFileStore(blobStore, flatFileItem, metadataFile, new NodeStateEntryReader(blobStore),
unmodifiableSet(preferredPathElements), algorithm);
storeList.add(store);
}
return storeList;
}
/**
* Returns the existing list of store files if it can read from system property OAK_INDEXER_SORTED_FILE_PATH which
* defines the existing folder where the flat file store files are present. Will throw an exception if it cannot
* read or the path in the system property is not a directory.
* If the system property OAK_INDEXER_SORTED_FILE_PATH in undefined, or it cannot read relevant files it
* initializes the flat file store.
*
* @return pair of "list of flat files" and metadata file
* @throws IOException
* @throws CompositeException
*/
private IndexStoreFiles createdSortedStoreFiles() throws IOException, CompositeException {
// Check system property defined path
String sortedFilePath = System.getProperty(OAK_INDEXER_SORTED_FILE_PATH);
if (StringUtils.isNotBlank(sortedFilePath)) {
File sortedDir = new File(sortedFilePath);
log.info("Attempting to read from provided sorted files directory [{}] (via system property '{}')",
sortedDir.getAbsolutePath(), OAK_INDEXER_SORTED_FILE_PATH);
// List of storefiles, List of metadatafile
IndexStoreFiles storeFiles = getIndexStoreFiles(sortedDir);
if (storeFiles != null) {
return storeFiles;
}
}
// Initialize the flat file store again
createStoreDir();
IndexStoreSortStrategy strategy = createSortStrategy(flatFileStoreDir);
File result = strategy.createSortedStoreFile();
File metadata = strategy.createMetadataFile();
entryCount = strategy.getEntryCount();
return new IndexStoreFiles(Collections.singletonList(result), metadata);
}
private static class IndexStoreFiles {
private final List<File> storeFiles;
private final File metadataFile;
public IndexStoreFiles(List<File> storeFiles, File metadataFile) {
this.storeFiles = storeFiles;
this.metadataFile = metadataFile;
}
}
private IndexStoreFiles getIndexStoreFiles(File sortedDir) {
if (sortedDir.exists() && sortedDir.canRead() && sortedDir.isDirectory()) {
File[] storeFiles = sortedDir.listFiles(
(dir, name) -> name.endsWith(IndexStoreUtils.getSortedStoreFileName(algorithm)));
File[] metadataFiles = sortedDir.listFiles(
(dir, name) -> name.endsWith(IndexStoreUtils.getMetadataFileName(algorithm)));
if (storeFiles != null && storeFiles.length != 0) {
// Not throwing error for backward compatibility
if (metadataFiles == null || metadataFiles.length == 0) {
log.error("Unable to find metadata file in directory:{}", sortedDir.getAbsolutePath());
return new IndexStoreFiles(Arrays.asList(storeFiles), null);
} else {
checkState(metadataFiles.length == 1, "Multiple metadata files available at path:{}, metadataFiles:{}", sortedDir.getAbsolutePath(),
Arrays.asList(metadataFiles));
return new IndexStoreFiles(Arrays.asList(storeFiles), metadataFiles[0]);
}
}
} else {
String msg = String.format("Cannot read sorted files directory at [%s]", sortedDir.getAbsolutePath());
throw new IllegalArgumentException(msg);
}
return null;
}
IndexStoreSortStrategy createSortStrategy(File dir) throws IOException {
switch (sortStrategyType) {
case STORE_AND_SORT:
log.info("Using StoreAndSortStrategy.");
log.warn("StoreAndSortStrategy is deprecated and will be removed in the near future. Use PipelinedStrategy instead.");
return new StoreAndSortStrategy(nodeStateEntryTraverserFactory, preferredPathElements, entryWriter, dir,
algorithm, pathPredicate, checkpoint);
case TRAVERSE_WITH_SORT:
log.info("Using TraverseWithSortStrategy");
log.warn("TraverseWithSortStrategy is deprecated and will be removed in the near future. Use PipelinedStrategy instead.");
return new TraverseWithSortStrategy(nodeStateEntryTraverserFactory, preferredPathElements, entryWriter, dir,
algorithm, pathPredicate, checkpoint);
case MULTITHREADED_TRAVERSE_WITH_SORT:
log.info("Using MultithreadedTraverseWithSortStrategy");
log.warn("MultithreadedTraverseWithSortStrategy is deprecated and will be removed in the near future. Use PipelinedStrategy instead.");
return new MultithreadedTraverseWithSortStrategy(nodeStateEntryTraverserFactory, lastModifiedBreakPoints, preferredPathElements,
blobStore, dir, existingDataDumpDirs, algorithm, memoryManager, dumpThreshold, pathPredicate, checkpoint);
case PIPELINED:
log.info("Using PipelinedStrategy");
List<PathFilter> pathFilters = indexDefinitions.stream().map(IndexDefinition::getPathFilter).collect(Collectors.toList());
return new PipelinedStrategy(mongoDocumentStore, mongoDatabase, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint);
}
throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
}
private void logFlags() {
log.info("Preferred path elements are {}", Iterables.toString(preferredPathElements));
log.info("Compression enabled while sorting : {} ({})", IndexStoreUtils.compressionEnabled(), OAK_INDEXER_USE_ZIP);
log.info("LZ4 enabled for compression algorithm : {} ({})", IndexStoreUtils.useLZ4(), OAK_INDEXER_USE_LZ4);
log.info("Sort strategy : {} ({})", sortStrategyType, OAK_INDEXER_TRAVERSE_WITH_SORT);
}
File createStoreDir() throws IOException {
flatFileStoreDir = Files.createTempDirectory(workDir.toPath(), FLAT_FILE_STORE_DIR_NAME_PREFIX).toFile();
return flatFileStoreDir;
}
/**
* Returns the flat file store dir.
* NOTE - Only works after flat file store dir has been built (i.e. after a call to {@link #build()}
*
* @return flat file store dir or <code>null</code> if it has not been built
*/
public File getFlatFileStoreDir() {
return flatFileStoreDir;
}
}