blob: 7b0d17c79298962320f8d27630ad2d7e90ed7aa8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.jackrabbit.oak.index.indexer.document.flatfile;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import com.google.common.base.Stopwatch;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.jackrabbit.oak.commons.sort.ExternalSort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static com.google.common.base.Charsets.UTF_8;
import static org.apache.commons.io.FileUtils.ONE_GB;
import static org.apache.jackrabbit.oak.commons.IOUtils.humanReadableByteCount;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createReader;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.createWriter;
import static org.apache.jackrabbit.oak.index.indexer.document.flatfile.FlatFileStoreUtils.sizeOf;
public class NodeStateEntrySorter {
private final Logger log = LoggerFactory.getLogger(getClass());
private static final int DEFAULTMAXTEMPFILES = 1024;
private final File nodeStateFile;
private final File workDir;
private final Charset charset = UTF_8;
private final Comparator<Iterable<String>> pathComparator;
private File sortedFile;
private boolean useZip;
private boolean deleteOriginal;
private long maxMemory = ONE_GB * 5;
private long actualFileSize;
public NodeStateEntrySorter(Comparator<Iterable<String>> pathComparator, File nodeStateFile, File workDir) {
this(pathComparator, nodeStateFile, workDir, getSortedFileName(nodeStateFile));
}
public NodeStateEntrySorter(Comparator<Iterable<String>> pathComparator, File nodeStateFile, File workDir, File sortedFile) {
this.nodeStateFile = nodeStateFile;
this.workDir = workDir;
this.sortedFile = sortedFile;
this.pathComparator = pathComparator;
}
public void setUseZip(boolean useZip) {
this.useZip = useZip;
}
public void setDeleteOriginal(boolean deleteOriginal) {
this.deleteOriginal = deleteOriginal;
}
public void setMaxMemoryInGB(long maxMemoryInGb) {
this.maxMemory = maxMemoryInGb * ONE_GB;
}
public void setActualFileSize(long actualFileSize) {
this.actualFileSize = actualFileSize;
}
public void sort() throws IOException {
long estimatedMemory = estimateAvailableMemory();
long memory = Math.min(estimatedMemory, maxMemory);
log.info("Sorting with memory {} (estimated {})", humanReadableByteCount(memory), humanReadableByteCount(estimatedMemory));
Stopwatch w = Stopwatch.createStarted();
Comparator<NodeStateHolder> comparator = (e1, e2) -> pathComparator.compare(e1.getPathElements(), e2.getPathElements());
Function<String, NodeStateHolder> func1 = (line) -> line == null ? null : new SimpleNodeStateHolder(line);
Function<NodeStateHolder, String> func2 = holder -> holder == null ? null : holder.getLine();
List<File> sortedFiles = sortInBatch(memory, comparator, func1, func2);
log.info("Batch sorting done in {} with {} files of size {} to merge", w, sortedFiles.size(),
humanReadableByteCount(sizeOf(sortedFiles)));
if (deleteOriginal) {
log.info("Removing the original file {}", nodeStateFile.getAbsolutePath());
FileUtils.forceDelete(nodeStateFile);
}
Stopwatch w2 = Stopwatch.createStarted();
mergeSortedFiles(comparator, func1, func2, sortedFiles);
log.info("Merging of sorted files completed in {}", w2);
log.info("Sorting completed in {}", w);
}
private void mergeSortedFiles(Comparator<NodeStateHolder> comparator, Function<String, NodeStateHolder> func1,
Function<NodeStateHolder, String> func2, List<File> sortedFiles) throws IOException {
try(BufferedWriter writer = createWriter(sortedFile, useZip)) {
ExternalSort.mergeSortedFiles(sortedFiles,
writer,
comparator,
charset,
true, //distinct
useZip, //useZip
func2,
func1
);
}
}
private List<File> sortInBatch(long memory, Comparator<NodeStateHolder> comparator,
Function<String, NodeStateHolder> func1,
Function<NodeStateHolder, String> func2) throws IOException {
if (useZip) {
try (BufferedReader reader = createReader(nodeStateFile, useZip)) {
return ExternalSort.sortInBatch(reader,
actualFileSize,
comparator, //Comparator to use
DEFAULTMAXTEMPFILES,
memory,
charset, //charset
workDir, //temp directory where intermediate files are created
true, //distinct
0,
useZip, //useZip
func2,
func1
);
}
} else {
return ExternalSort.sortInBatch(nodeStateFile,
comparator, //Comparator to use
DEFAULTMAXTEMPFILES,
memory,
charset, //charset
workDir, //temp directory where intermediate files are created
true,
0,
useZip,
func2,
func1
);
}
}
public File getSortedFile() {
return sortedFile;
}
private static File getSortedFileName(File file) {
String extension = FilenameUtils.getExtension(file.getName());
String baseName = FilenameUtils.getBaseName(file.getName());
return new File(file.getParentFile(), baseName + "-sorted." + extension);
}
/**
* This method calls the garbage collector and then returns the free
* memory. This avoids problems with applications where the GC hasn't
* reclaimed memory and reports no available memory.
*
* @return available memory
*/
private static long estimateAvailableMemory() {
System.gc();
// http://stackoverflow.com/questions/12807797/java-get-available-memory
Runtime r = Runtime.getRuntime();
long allocatedMemory = r.totalMemory() - r.freeMemory();
long presFreeMemory = r.maxMemory() - allocatedMemory;
return presFreeMemory;
}
}