| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.jackrabbit.oak.commons.sort; |
| |
| import java.io.BufferedReader; |
| import java.io.BufferedWriter; |
| import java.io.Closeable; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.io.Reader; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.collect.Lists; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.LineIterator; |
| import org.apache.jackrabbit.oak.commons.FileIOUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.jackrabbit.oak.commons.sort.EscapeUtils.escapeLineBreak; |
| import static org.apache.jackrabbit.oak.commons.sort.EscapeUtils.unescapeLineBreaks; |
| |
| /** |
| * Utility class to store a list of string and perform sort on that. For small size |
| * the list would be maintained in memory. If the size crosses the required threshold then |
| * the sorting would be performed externally |
| */ |
| public class StringSort implements Iterable<String>, Closeable { |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| public static final int BATCH_SIZE = 2048; |
| |
| private final int overflowToDiskThreshold; |
| private final Comparator<String> comparator; |
| |
| private final List<String> ids = Lists.newArrayList(); |
| private long size; |
| |
| private final List<String> inMemBatch = Lists.newArrayList(); |
| |
| private boolean useFile; |
| private PersistentState persistentState; |
| |
| public StringSort(int overflowToDiskThreshold, Comparator<String> comparator) { |
| this.overflowToDiskThreshold = overflowToDiskThreshold; |
| this.comparator = comparator; |
| } |
| |
| public void add(String id) throws IOException { |
| if (useFile) { |
| addToBatch(id); |
| } else { |
| ids.add(id); |
| if (ids.size() >= overflowToDiskThreshold) { |
| flushToFile(ids); |
| useFile = true; |
| log.debug("In memory buffer crossed the threshold of {}. " + |
| "Switching to filesystem [{}] to manage the state", overflowToDiskThreshold, persistentState); |
| } |
| } |
| size++; |
| } |
| |
| public void sort() throws IOException { |
| if (useFile) { |
| //Flush the last batch |
| flushToFile(inMemBatch); |
| persistentState.sort(); |
| } else { |
| Collections.sort(ids, comparator); |
| } |
| } |
| |
| public Iterator<String> getIds() throws IOException { |
| if (useFile) { |
| return persistentState.getIterator(); |
| } else { |
| return ids.iterator(); |
| } |
| } |
| |
| public long getSize() { |
| return size; |
| } |
| |
| public boolean isEmpty() { |
| return size == 0; |
| } |
| |
| public boolean usingFile() { |
| return useFile; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (persistentState != null) { |
| persistentState.close(); |
| } |
| } |
| |
| @Override |
| public Iterator<String> iterator() { |
| try { |
| return getIds(); |
| } catch (IOException e) { |
| throw new IllegalStateException(e); |
| } |
| } |
| |
| //--------------------------< internal >------------------------------------ |
| |
| private void addToBatch(String id) throws IOException { |
| inMemBatch.add(id); |
| if (inMemBatch.size() >= BATCH_SIZE) { |
| flushToFile(inMemBatch); |
| } |
| } |
| |
| private void flushToFile(List<String> ids) throws IOException { |
| BufferedWriter w = getPersistentState().getWriter(); |
| for (String id : ids) { |
| w.write(escapeLineBreak(id)); |
| w.newLine(); |
| } |
| ids.clear(); |
| } |
| |
| private PersistentState getPersistentState() { |
| //Lazily initialize the persistent state |
| if (persistentState == null) { |
| persistentState = new PersistentState(comparator); |
| } |
| return persistentState; |
| } |
| |
| private static class PersistentState implements Closeable { |
| /** |
| * Maximum loop count when creating temp directories. |
| */ |
| private static final int TEMP_DIR_ATTEMPTS = 10000; |
| |
| private final Charset charset = Charsets.UTF_8; |
| private final File workDir; |
| private final Comparator<String> comparator; |
| private File idFile; |
| private File sortedFile; |
| private BufferedWriter writer; |
| private List<CloseableIterator> openedIterators = Lists.newArrayList(); |
| |
| public PersistentState(Comparator<String> comparator) { |
| this(comparator, createTempDir("oak-sorter-")); |
| } |
| |
| public PersistentState(Comparator<String> comparator, File workDir) { |
| this.workDir = workDir; |
| this.comparator = FileIOUtils.lineBreakAwareComparator(comparator); |
| } |
| |
| public BufferedWriter getWriter() throws FileNotFoundException { |
| if (idFile == null) { |
| idFile = new File(workDir, "strings.txt"); |
| sortedFile = new File(workDir, "strings-sorted.txt"); |
| writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(idFile), charset)); |
| } |
| return writer; |
| } |
| |
| public void sort() throws IOException { |
| closeWriter(); |
| |
| List<File> sortedFiles = ExternalSort.sortInBatch(idFile, |
| comparator, //Comparator to use |
| ExternalSort.DEFAULTMAXTEMPFILES, |
| ExternalSort.DEFAULT_MAX_MEM_BYTES, |
| charset, //charset |
| workDir, //temp directory where intermediate files are created |
| true //distinct |
| ); |
| |
| ExternalSort.mergeSortedFiles(sortedFiles, |
| sortedFile, |
| comparator, |
| charset, |
| true |
| ); |
| } |
| |
| public Iterator<String> getIterator() throws IOException { |
| CloseableIterator itr = new CloseableIterator( |
| new BufferedReader(new InputStreamReader(new FileInputStream(sortedFile), charset))); |
| openedIterators.add(itr); |
| return itr; |
| } |
| |
| @Override |
| public String toString() { |
| return "PersistentState : workDir=" + workDir.getAbsolutePath(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| List<Closeable> closer = new ArrayList<Closeable>(); |
| try { |
| // Closing is done in LIFO manner! |
| closer.add(0, new Closeable() { |
| @Override |
| public void close() throws IOException { |
| FileUtils.deleteDirectory(workDir); |
| } |
| }); |
| closer.add(0, writer); |
| for (CloseableIterator citr : openedIterators) { |
| closer.add(0, citr); |
| } |
| } finally { |
| closeAll(closer); |
| } |
| } |
| |
| private void closeWriter() throws IOException { |
| writer.close(); |
| } |
| |
| /** |
| * Taken from com.google.common.io.Files#createTempDir() |
| * Modified to provide a prefix |
| */ |
| private static File createTempDir(String prefix) { |
| File baseDir = new File(System.getProperty("java.io.tmpdir")); |
| String baseName = System.currentTimeMillis() + "-"; |
| |
| for (int counter = 0; counter < TEMP_DIR_ATTEMPTS; counter++) { |
| File tempDir = new File(baseDir, prefix + baseName + counter); |
| if (tempDir.mkdir()) { |
| return tempDir; |
| } |
| } |
| throw new IllegalStateException("Failed to create directory within " |
| + TEMP_DIR_ATTEMPTS + " attempts (tried " |
| + baseName + "0 to " + baseName + (TEMP_DIR_ATTEMPTS - 1) + ')'); |
| } |
| |
| // inspired by Guava Closer, see |
| // https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/Closer.html |
| private static void closeAll(List<Closeable> closer) throws IOException { |
| IOException ioex = null; |
| for (Closeable c : closer) { |
| try { |
| c.close(); |
| } catch (IOException mostlyIgnored) { |
| if (ioex == null) { |
| ioex = mostlyIgnored; |
| } |
| } |
| |
| if (ioex != null) { |
| throw ioex; |
| } |
| } |
| } |
| } |
| |
| private static class CloseableIterator extends LineIterator implements Closeable { |
| public CloseableIterator(Reader reader) throws IllegalArgumentException { |
| super(reader); |
| } |
| |
| @Override |
| public String next() { |
| return unescapeLineBreaks(super.next()); |
| } |
| } |
| } |