| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.util; |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.lucene.codecs.CodecUtil; |
| import org.apache.lucene.index.CorruptIndexException; |
| import org.apache.lucene.store.ChecksumIndexInput; |
| import org.apache.lucene.store.CorruptingIndexOutput; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FilterDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexInput; |
| import org.apache.lucene.store.IndexOutput; |
| import org.apache.lucene.util.OfflineSorter.BufferSize; |
| import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter; |
| import org.apache.lucene.util.OfflineSorter.SortInfo; |
| |
| /** Tests for on-disk merge sorting. */ |
| public class TestOfflineSorter extends LuceneTestCase { |
| private Path tempDir; |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| tempDir = createTempDir("mergesort"); |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| if (tempDir != null) { |
| IOUtils.rm(tempDir); |
| } |
| super.tearDown(); |
| } |
| |
| public void testEmpty() throws Exception { |
| try (Directory dir = newDirectory()) { |
| checkSort(dir, new OfflineSorter(dir, "foo"), new byte[][] {}); |
| } |
| } |
| |
| public void testSingleLine() throws Exception { |
| try (Directory dir = newDirectory()) { |
| checkSort( |
| dir, |
| new OfflineSorter(dir, "foo"), |
| new byte[][] {"Single line only.".getBytes(StandardCharsets.UTF_8)}); |
| } |
| } |
| |
| private ExecutorService randomExecutorServiceOrNull() { |
| if (random().nextBoolean()) { |
| return null; |
| } else { |
| int maxThreads = TEST_NIGHTLY ? TestUtil.nextInt(random(), 2, 6) : 2; |
| return new ThreadPoolExecutor( |
| 1, |
| maxThreads, |
| Long.MAX_VALUE, |
| TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<Runnable>(), |
| new NamedThreadFactory("TestOfflineSorter")); |
| } |
| } |
| |
| @Slow |
| public void testIntermediateMerges() throws Exception { |
| // Sort 20 mb worth of data with 1mb buffer, binary merging. |
| try (Directory dir = newFSDirectory(createTempDir())) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| SortInfo info = |
| checkSort( |
| dir, |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| 2, |
| -1, |
| exec, |
| TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int) OfflineSorter.MB * 20)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| assertTrue(info.mergeRounds > 10); |
| } |
| } |
| |
| @Slow |
| public void testSmallRandom() throws Exception { |
| // Sort 20 mb worth of data with 1mb buffer. |
| try (Directory dir = newFSDirectory(createTempDir())) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| SortInfo sortInfo = |
| checkSort( |
| dir, |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| OfflineSorter.MAX_TEMPFILES, |
| -1, |
| exec, |
| TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int) OfflineSorter.MB * 20)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| assertEquals(3, sortInfo.mergeRounds); |
| } |
| } |
| |
| @Nightly |
| public void testLargerRandom() throws Exception { |
| // Sort 100MB worth of data with 15mb buffer. |
| try (Directory dir = newFSDirectory(createTempDir())) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| checkSort( |
| dir, |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(16), |
| OfflineSorter.MAX_TEMPFILES, |
| -1, |
| exec, |
| TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int) OfflineSorter.MB * 100)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| } |
| } |
| |
| private byte[][] generateRandom(int howMuchDataInBytes) { |
| ArrayList<byte[]> data = new ArrayList<>(); |
| while (howMuchDataInBytes > 0) { |
| byte[] current = new byte[random().nextInt(256)]; |
| random().nextBytes(current); |
| data.add(current); |
| howMuchDataInBytes -= current.length; |
| } |
| byte[][] bytes = data.toArray(new byte[data.size()][]); |
| return bytes; |
| } |
| |
| // Generates same data every time: |
| private byte[][] generateFixed(int howMuchDataInBytes) { |
| ArrayList<byte[]> data = new ArrayList<>(); |
| int length = 256; |
| byte counter = 0; |
| while (howMuchDataInBytes > 0) { |
| byte[] current = new byte[length]; |
| for (int i = 0; i < current.length; i++) { |
| current[i] = counter; |
| counter++; |
| } |
| data.add(current); |
| howMuchDataInBytes -= current.length; |
| |
| length--; |
| if (length <= 128) { |
| length = 256; |
| } |
| } |
| byte[][] bytes = data.toArray(new byte[data.size()][]); |
| return bytes; |
| } |
| |
| static final Comparator<byte[]> unsignedByteOrderComparator = |
| new Comparator<byte[]>() { |
| @Override |
| public int compare(byte[] left, byte[] right) { |
| final int max = Math.min(left.length, right.length); |
| for (int i = 0, j = 0; i < max; i++, j++) { |
| int diff = (left[i] & 0xff) - (right[j] & 0xff); |
| if (diff != 0) { |
| return diff; |
| } |
| } |
| return left.length - right.length; |
| } |
| }; |
| |
| /** Check sorting data on an instance of {@link OfflineSorter}. */ |
| private SortInfo checkSort(Directory dir, OfflineSorter sorter, byte[][] data) |
| throws IOException { |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, data); |
| |
| IndexOutput golden = dir.createTempOutput("golden", "tmp", IOContext.DEFAULT); |
| Arrays.sort(data, unsignedByteOrderComparator); |
| writeAll(golden, data); |
| |
| String sorted = sorter.sort(unsorted.getName()); |
| // System.out.println("Input size [MB]: " + unsorted.length() / (1024 * 1024)); |
| // System.out.println(sortInfo); |
| assertFilesIdentical(dir, golden.getName(), sorted); |
| |
| return sorter.sortInfo; |
| } |
| |
| /** Make sure two files are byte-byte identical. */ |
| private void assertFilesIdentical(Directory dir, String golden, String sorted) |
| throws IOException { |
| long numBytes = dir.fileLength(golden); |
| assertEquals(numBytes, dir.fileLength(sorted)); |
| |
| byte[] buf1 = new byte[64 * 1024]; |
| byte[] buf2 = new byte[64 * 1024]; |
| try (IndexInput in1 = dir.openInput(golden, IOContext.READONCE); |
| IndexInput in2 = dir.openInput(sorted, IOContext.READONCE)) { |
| long left = numBytes; |
| while (left > 0) { |
| int chunk = (int) Math.min(buf1.length, left); |
| left -= chunk; |
| in1.readBytes(buf1, 0, chunk); |
| in2.readBytes(buf2, 0, chunk); |
| for (int i = 0; i < chunk; i++) { |
| assertEquals(buf1[i], buf2[i]); |
| } |
| } |
| } |
| } |
| |
| /** NOTE: closes the provided {@link IndexOutput} */ |
| private void writeAll(IndexOutput out, byte[][] data) throws IOException { |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| for (byte[] datum : data) { |
| w.write(datum); |
| } |
| CodecUtil.writeFooter(out); |
| } |
| } |
| |
| public void testRamBuffer() { |
| int numIters = atLeast(10000); |
| for (int i = 0; i < numIters; i++) { |
| BufferSize.megabytes(1 + random().nextInt(2047)); |
| } |
| BufferSize.megabytes(2047); |
| BufferSize.megabytes(1); |
| |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| BufferSize.megabytes(2048); |
| }); |
| |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| BufferSize.megabytes(0); |
| }); |
| |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| BufferSize.megabytes(-1); |
| }); |
| } |
| |
| public void testThreadSafety() throws Exception { |
| Thread[] threads = new Thread[TestUtil.nextInt(random(), 4, 10)]; |
| final AtomicBoolean failed = new AtomicBoolean(); |
| final int iters = atLeast(200); |
| try (Directory dir = newDirectory()) { |
| for (int i = 0; i < threads.length; i++) { |
| final int threadID = i; |
| threads[i] = |
| new Thread() { |
| @Override |
| public void run() { |
| try { |
| for (int iter = 0; iter < iters && failed.get() == false; iter++) { |
| checkSort( |
| dir, |
| new OfflineSorter(dir, "foo_" + threadID + "_" + iter), |
| generateRandom(1024)); |
| } |
| } catch (Throwable th) { |
| failed.set(true); |
| throw new RuntimeException(th); |
| } |
| } |
| }; |
| threads[i].start(); |
| } |
| for (Thread thread : threads) { |
| thread.join(); |
| } |
| } |
| |
| assertFalse(failed.get()); |
| } |
| |
| /** |
| * Make sure corruption on the incoming (unsorted) file is caught, even if the corruption didn't |
| * confuse OfflineSorter! |
| */ |
| public void testBitFlippedOnInput1() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = |
| new FilterDirectory(dir0) { |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) |
| throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (prefix.equals("unsorted")) { |
| return new CorruptingIndexOutput(dir0, 22, out); |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed(10 * 1024)); |
| |
| CorruptIndexException e = |
| expectThrows( |
| CorruptIndexException.class, |
| () -> { |
| new OfflineSorter(dir, "foo").sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** |
| * Make sure corruption on the incoming (unsorted) file is caught, if the corruption did confuse |
| * OfflineSorter! |
| */ |
| public void testBitFlippedOnInput2() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = |
| new FilterDirectory(dir0) { |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) |
| throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (prefix.equals("unsorted")) { |
| return new CorruptingIndexOutput(dir0, 22, out) { |
| @Override |
| protected void corruptFile() throws IOException { |
| String newTempName; |
| try (IndexOutput tmpOut = |
| dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT); |
| IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) { |
| newTempName = tmpOut.getName(); |
| // Replace length at the end with a too-long value: |
| short v = in.readShort(); |
| assertEquals(256, v); |
| tmpOut.writeShort(Short.MAX_VALUE); |
| tmpOut.copyBytes(in, in.length() - Short.BYTES); |
| } |
| |
| // Delete original and copy corrupt version back: |
| dir0.deleteFile(out.getName()); |
| dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT); |
| dir0.deleteFile(newTempName); |
| } |
| }; |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed(5 * 1024)); |
| |
| // This corruption made OfflineSorter fail with its own exception, but we verify and throw a |
| // CorruptIndexException |
| // instead when checksums don't match. |
| CorruptIndexException e = |
| expectThrows( |
| CorruptIndexException.class, |
| () -> { |
| new OfflineSorter(dir, "foo").sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** |
| * Make sure corruption on a temp file (partition) is caught, even if the corruption didn't |
| * confuse OfflineSorter! |
| */ |
| public void testBitFlippedOnPartition1() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = |
| new FilterDirectory(dir0) { |
| |
| boolean corrupted; |
| |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) |
| throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (corrupted == false && suffix.equals("sort")) { |
| corrupted = true; |
| return new CorruptingIndexOutput(dir0, 544677, out); |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); |
| |
| CorruptIndexException e = |
| expectThrows( |
| CorruptIndexException.class, |
| () -> { |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| 10, |
| -1, |
| null, |
| 0) |
| .sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** |
| * Make sure corruption on a temp file (partition) is caught, if the corruption did confuse |
| * OfflineSorter! |
| */ |
| public void testBitFlippedOnPartition2() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = |
| new FilterDirectory(dir0) { |
| |
| boolean corrupted; |
| |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) |
| throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (corrupted == false && suffix.equals("sort")) { |
| corrupted = true; |
| return new CorruptingIndexOutput(dir0, 544677, out) { |
| @Override |
| protected void corruptFile() throws IOException { |
| String newTempName; |
| try (IndexOutput tmpOut = |
| dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT); |
| IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) { |
| newTempName = tmpOut.getName(); |
| tmpOut.copyBytes(in, 1025905); |
| short v = in.readShort(); |
| assertEquals(254, v); |
| tmpOut.writeShort(Short.MAX_VALUE); |
| tmpOut.copyBytes(in, in.length() - 1025905 - Short.BYTES); |
| } |
| |
| // Delete original and copy corrupt version back: |
| dir0.deleteFile(out.getName()); |
| dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT); |
| dir0.deleteFile(newTempName); |
| } |
| }; |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); |
| |
| CorruptIndexException e = |
| expectThrows( |
| CorruptIndexException.class, |
| () -> { |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| 10, |
| -1, |
| null, |
| 0) |
| .sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| @Nightly |
| public void testFixedLengthHeap() throws Exception { |
| // Make sure the RAM accounting is correct, i.e. if we are sorting fixed width |
| // ints (4 bytes) then the heap used is really only 4 bytes per value: |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| for (int i = 0; i < 1024 * 1024; i++) { |
| random().nextBytes(bytes); |
| w.write(bytes); |
| } |
| CodecUtil.writeFooter(out); |
| } |
| |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| OfflineSorter sorter = |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(4), |
| OfflineSorter.MAX_TEMPFILES, |
| Integer.BYTES, |
| exec, |
| TestUtil.nextInt(random(), 1, 4)); |
| sorter.sort(out.getName()); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| // 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition: |
| assertEquals(0, sorter.sortInfo.mergeRounds); |
| dir.close(); |
| } |
| |
| public void testFixedLengthLiesLiesLies() throws Exception { |
| // Make sure OfflineSorter catches me if I lie about the fixed value length: |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| random().nextBytes(bytes); |
| w.write(bytes); |
| CodecUtil.writeFooter(out); |
| } |
| |
| OfflineSorter sorter = |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(4), |
| OfflineSorter.MAX_TEMPFILES, |
| Long.BYTES, |
| null, |
| 0); |
| IllegalArgumentException e = |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| sorter.sort(out.getName()); |
| }); |
| assertEquals("value length is 4 but is supposed to always be 8", e.getMessage()); |
| dir.close(); |
| } |
| |
| // OfflineSorter should not call my BytesSequencesReader.next() again after it already returned |
| // null: |
| public void testOverNexting() throws Exception { |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| random().nextBytes(bytes); |
| w.write(bytes); |
| CodecUtil.writeFooter(out); |
| } |
| |
| new OfflineSorter( |
| dir, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(4), |
| OfflineSorter.MAX_TEMPFILES, |
| Integer.BYTES, |
| null, |
| 0) { |
| @Override |
| protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) |
| throws IOException { |
| ByteSequencesReader other = super.getReader(in, name); |
| |
| return new ByteSequencesReader(in, name) { |
| |
| private boolean alreadyEnded; |
| |
| @Override |
| public BytesRef next() throws IOException { |
| // if we returned null already, OfflineSorter should not call next() again |
| assertFalse(alreadyEnded); |
| BytesRef result = other.next(); |
| if (result == null) { |
| alreadyEnded = true; |
| } |
| return result; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| other.close(); |
| } |
| }; |
| } |
| }.sort(out.getName()); |
| dir.close(); |
| } |
| |
| public void testInvalidFixedLength() throws Exception { |
| IllegalArgumentException e; |
| e = |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| new OfflineSorter( |
| null, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| OfflineSorter.MAX_TEMPFILES, |
| 0, |
| null, |
| 0); |
| }); |
| assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage()); |
| e = |
| expectThrows( |
| IllegalArgumentException.class, |
| () -> { |
| new OfflineSorter( |
| null, |
| "foo", |
| OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), |
| OfflineSorter.MAX_TEMPFILES, |
| Integer.MAX_VALUE, |
| null, |
| 0); |
| }); |
| assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage()); |
| } |
| } |