blob: 10c3d7735e9afc0f6a4ba3ebbd97cca996fcfdc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jackrabbit.oak.commons.sort;
import org.apache.jackrabbit.guava.common.base.Preconditions;
import org.apache.jackrabbit.oak.commons.Compression;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.function.Function;
/**
* Variation of ExternalSort that stores the lines read from intermediate files as byte arrays to avoid the conversion
* from byte[] to String and then back.
*/
public class ExternalSortByteArray {
private final static int DEFAULT_BUFFER_SIZE = 16 * 1024;
public static <T> void mergeSortedFilesBinary(List<Path> files, BufferedOutputStream fbw, final Comparator<T> cmp,
boolean distinct, Compression algorithm,
Function<T, byte[]> typeToByteArray, Function<byte[], T> byteArrayToType)
throws IOException {
mergeSortedFilesBinary(files, fbw, cmp, distinct, algorithm, typeToByteArray, byteArrayToType, DEFAULT_BUFFER_SIZE);
}
public static <T> void mergeSortedFilesBinary(List<Path> files, BufferedOutputStream fbw, final Comparator<T> cmp,
boolean distinct, Compression algorithm,
Function<T, byte[]> typeToByteArray, Function<byte[], T> byteArrayToType, int readBufferSize)
throws IOException {
ArrayList<BinaryFileBuffer<T>> bfbs = new ArrayList<>();
try {
for (Path f : files) {
InputStream in = algorithm.getInputStream(Files.newInputStream(f));
bfbs.add(new BinaryFileBuffer<>(in, byteArrayToType, readBufferSize));
}
mergeBinary(fbw, cmp, distinct, bfbs, typeToByteArray);
} finally {
for (BinaryFileBuffer<T> buffer : bfbs) {
try {
buffer.close();
} catch (Exception ignored) {
}
}
for (Path f : files) {
Files.deleteIfExists(f);
}
}
}
private static <T> int mergeBinary(BufferedOutputStream fbw, final Comparator<T> cmp, boolean distinct,
List<BinaryFileBuffer<T>> buffers, Function<T, byte[]> typeToByteArray)
throws IOException {
PriorityQueue<BinaryFileBuffer<T>> pq = new PriorityQueue<>(
11,
(i, j) -> cmp.compare(i.peek(), j.peek())
);
for (BinaryFileBuffer<T> bfb : buffers) {
if (!bfb.empty()) {
pq.add(bfb);
}
}
int rowcounter = 0;
T lastLine = null;
while (!pq.isEmpty()) {
BinaryFileBuffer<T> bfb = pq.poll();
T r = bfb.pop();
// Skip duplicate lines
if (!distinct || lastLine == null || cmp.compare(r, lastLine) != 0) {
fbw.write(typeToByteArray.apply(r));
fbw.write('\n');
lastLine = r;
}
++rowcounter;
if (bfb.empty()) {
bfb.fbr.close();
} else {
pq.add(bfb); // add it back
}
}
return rowcounter;
}
/**
* WARNING: Uses '\n' as a line separator, it will not work with other line separators.
*/
private static class BinaryFileBuffer<T> {
public final InputStream fbr;
private final Function<byte[], T> byteArrayToType;
private T cache;
private boolean empty;
// Used to reassemble the lines read from the source input stream.
private final ByteArrayOutputStream bais = new ByteArrayOutputStream();
private final byte[] buffer;
private int bufferPos = 0;
private int bufferLimit = 0;
public BinaryFileBuffer(InputStream r, Function<byte[], T> byteArrayToType, int bufferSize)
throws IOException {
Preconditions.checkArgument(bufferSize > 1024, "Buffer size must be greater than 1024 bytes");
this.fbr = r;
this.byteArrayToType = byteArrayToType;
this.buffer = new byte[bufferSize];
reload();
}
public boolean empty() {
return this.empty;
}
private void reload() throws IOException {
try {
byte[] line = readLine();
this.cache = byteArrayToType.apply(line);
this.empty = this.cache == null;
} catch (EOFException oef) {
this.empty = true;
this.cache = null;
}
}
private boolean bufferIsEmpty() {
return bufferPos >= bufferLimit;
}
/*
* Read a line from the source input as a byte array. This is adapted from the implementation of
* BufferedReader#readLine() but without converting the line to a String.
*/
private byte[] readLine() throws IOException {
bais.reset();
for (; ; ) {
if (bufferIsEmpty()) {
bufferLimit = fbr.read(buffer);
bufferPos = 0;
}
if (bufferIsEmpty()) { /* EOF */
// Buffer is still empty even after trying to read from input stream. We have reached the EOF
// Return whatever is left on the bais
if (bais.size() == 0) {
return null;
} else {
return bais.toByteArray();
}
}
// Start reading a new line
int startByte = bufferPos;
while (!bufferIsEmpty()) {
byte c = buffer[bufferPos];
bufferPos++;
if (c == '\n') { /* EOL */
int lineSegmentSize = bufferPos - startByte - 1; // exclude \n
if (bais.size() == 0) {
// There is no partial data on the bais, which means that the whole line is in the
// buffer. In this case, we can extract the line directly from the buffer without
// copying first to the bais
if (lineSegmentSize == 0) {
return null;
} else {
// Copy the line from the buffer to a new byte array and return it
byte[] line = new byte[lineSegmentSize];
System.arraycopy(buffer, startByte, line, 0, lineSegmentSize);
return line;
}
} else {
// The first section of the line is in the bais, the remainder in the buffer. Finish
// reassembling the line in the bais and return it
bais.write(buffer, startByte, lineSegmentSize);
if (bais.size() == 0) {
return null;
} else {
return bais.toByteArray();
}
}
}
}
// Reached the end of the buffer. Copy whatever is left in the buffer and read more from the source stream
bais.write(buffer, startByte, bufferPos - startByte);
}
}
public void close() throws IOException {
this.fbr.close();
}
public T peek() {
if (empty()) {
return null;
}
return this.cache;
}
public T pop() throws IOException {
T answer = peek();
reload();
return answer;
}
}
}