| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.lucene.util; |
| |
| |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.Path; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.lucene.codecs.CodecUtil; |
| import org.apache.lucene.index.CorruptIndexException; |
| import org.apache.lucene.store.ChecksumIndexInput; |
| import org.apache.lucene.store.CorruptingIndexOutput; |
| import org.apache.lucene.store.Directory; |
| import org.apache.lucene.store.FilterDirectory; |
| import org.apache.lucene.store.IOContext; |
| import org.apache.lucene.store.IndexInput; |
| import org.apache.lucene.store.IndexOutput; |
| import org.apache.lucene.util.OfflineSorter.BufferSize; |
| import org.apache.lucene.util.OfflineSorter.ByteSequencesWriter; |
| import org.apache.lucene.util.OfflineSorter.SortInfo; |
| |
| /** |
| * Tests for on-disk merge sorting. |
| */ |
| public class TestOfflineSorter extends LuceneTestCase { |
| private Path tempDir; |
| |
| @Override |
| public void setUp() throws Exception { |
| super.setUp(); |
| tempDir = createTempDir("mergesort"); |
| } |
| |
| @Override |
| public void tearDown() throws Exception { |
| if (tempDir != null) { |
| IOUtils.rm(tempDir); |
| } |
| super.tearDown(); |
| } |
| |
| public void testEmpty() throws Exception { |
| try (Directory dir = newDirectory()) { |
| checkSort(dir, new OfflineSorter(dir, "foo"), new byte [][] {}); |
| } |
| } |
| |
| public void testSingleLine() throws Exception { |
| try (Directory dir = newDirectory()) { |
| checkSort(dir, new OfflineSorter(dir, "foo"), new byte [][] { |
| "Single line only.".getBytes(StandardCharsets.UTF_8) |
| }); |
| } |
| } |
| |
| private ExecutorService randomExecutorServiceOrNull() { |
| if (random().nextBoolean()) { |
| return null; |
| } else { |
| return new ThreadPoolExecutor(1, TestUtil.nextInt(random(), 2, 6), Long.MAX_VALUE, TimeUnit.MILLISECONDS, |
| new LinkedBlockingQueue<Runnable>(), |
| new NamedThreadFactory("TestIndexSearcher")); |
| } |
| } |
| |
| @Slow |
| public void testIntermediateMerges() throws Exception { |
| // Sort 20 mb worth of data with 1mb buffer, binary merging. |
| try (Directory dir = newDirectory()) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| SortInfo info = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 2, -1, exec, TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int)OfflineSorter.MB * 20)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| assertTrue(info.mergeRounds > 10); |
| } |
| } |
| |
| @Slow |
| public void testSmallRandom() throws Exception { |
| // Sort 20 mb worth of data with 1mb buffer. |
| try (Directory dir = newDirectory()) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| SortInfo sortInfo = checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int)OfflineSorter.MB * 20)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| assertEquals(3, sortInfo.mergeRounds); |
| } |
| } |
| |
| @Nightly |
| public void testLargerRandom() throws Exception { |
| // Sort 100MB worth of data with 15mb buffer. |
| try (Directory dir = newFSDirectory(createTempDir())) { |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| checkSort(dir, new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(16), OfflineSorter.MAX_TEMPFILES, -1, exec, TestUtil.nextInt(random(), 1, 4)), |
| generateRandom((int)OfflineSorter.MB * 100)); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| } |
| } |
| |
| private byte[][] generateRandom(int howMuchDataInBytes) { |
| ArrayList<byte[]> data = new ArrayList<>(); |
| while (howMuchDataInBytes > 0) { |
| byte[] current = new byte[random().nextInt(256)]; |
| random().nextBytes(current); |
| data.add(current); |
| howMuchDataInBytes -= current.length; |
| } |
| byte [][] bytes = data.toArray(new byte[data.size()][]); |
| return bytes; |
| } |
| |
| // Generates same data every time: |
| private byte[][] generateFixed(int howMuchDataInBytes) { |
| ArrayList<byte[]> data = new ArrayList<>(); |
| int length = 256; |
| byte counter = 0; |
| while (howMuchDataInBytes > 0) { |
| byte[] current = new byte[length]; |
| for(int i=0;i<current.length;i++) { |
| current[i] = counter; |
| counter++; |
| } |
| data.add(current); |
| howMuchDataInBytes -= current.length; |
| |
| length--; |
| if (length <= 128) { |
| length = 256; |
| } |
| } |
| byte [][] bytes = data.toArray(new byte[data.size()][]); |
| return bytes; |
| } |
| |
| static final Comparator<byte[]> unsignedByteOrderComparator = new Comparator<byte[]>() { |
| @Override |
| public int compare(byte[] left, byte[] right) { |
| final int max = Math.min(left.length, right.length); |
| for (int i = 0, j = 0; i < max; i++, j++) { |
| int diff = (left[i] & 0xff) - (right[j] & 0xff); |
| if (diff != 0) { |
| return diff; |
| } |
| } |
| return left.length - right.length; |
| } |
| }; |
| |
| /** |
| * Check sorting data on an instance of {@link OfflineSorter}. |
| */ |
| private SortInfo checkSort(Directory dir, OfflineSorter sorter, byte[][] data) throws IOException { |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, data); |
| |
| IndexOutput golden = dir.createTempOutput("golden", "tmp", IOContext.DEFAULT); |
| Arrays.sort(data, unsignedByteOrderComparator); |
| writeAll(golden, data); |
| |
| String sorted = sorter.sort(unsorted.getName()); |
| //System.out.println("Input size [MB]: " + unsorted.length() / (1024 * 1024)); |
| //System.out.println(sortInfo); |
| assertFilesIdentical(dir, golden.getName(), sorted); |
| |
| return sorter.sortInfo; |
| } |
| |
| /** |
| * Make sure two files are byte-byte identical. |
| */ |
| private void assertFilesIdentical(Directory dir, String golden, String sorted) throws IOException { |
| long numBytes = dir.fileLength(golden); |
| assertEquals(numBytes, dir.fileLength(sorted)); |
| |
| byte[] buf1 = new byte[64 * 1024]; |
| byte[] buf2 = new byte[64 * 1024]; |
| try ( |
| IndexInput in1 = dir.openInput(golden, IOContext.READONCE); |
| IndexInput in2 = dir.openInput(sorted, IOContext.READONCE) |
| ) { |
| long left = numBytes; |
| while (left > 0) { |
| int chunk = (int) Math.min(buf1.length, left); |
| left -= chunk; |
| in1.readBytes(buf1, 0, chunk); |
| in2.readBytes(buf2, 0, chunk); |
| for (int i = 0; i < chunk; i++) { |
| assertEquals(buf1[i], buf2[i]); |
| } |
| } |
| } |
| } |
| |
| /** NOTE: closes the provided {@link IndexOutput} */ |
| private void writeAll(IndexOutput out, byte[][] data) throws IOException { |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| for (byte [] datum : data) { |
| w.write(datum); |
| } |
| CodecUtil.writeFooter(out); |
| } |
| } |
| |
| public void testRamBuffer() { |
| int numIters = atLeast(10000); |
| for (int i = 0; i < numIters; i++) { |
| BufferSize.megabytes(1+random().nextInt(2047)); |
| } |
| BufferSize.megabytes(2047); |
| BufferSize.megabytes(1); |
| |
| expectThrows(IllegalArgumentException.class, () -> { |
| BufferSize.megabytes(2048); |
| }); |
| |
| expectThrows(IllegalArgumentException.class, () -> { |
| BufferSize.megabytes(0); |
| }); |
| |
| expectThrows(IllegalArgumentException.class, () -> { |
| BufferSize.megabytes(-1); |
| }); |
| } |
| |
| public void testThreadSafety() throws Exception { |
| Thread[] threads = new Thread[TestUtil.nextInt(random(), 4, 10)]; |
| final AtomicBoolean failed = new AtomicBoolean(); |
| final int iters = atLeast(200); |
| try (Directory dir = newDirectory()) { |
| for(int i=0;i<threads.length;i++) { |
| final int threadID = i; |
| threads[i] = new Thread() { |
| @Override |
| public void run() { |
| try { |
| for(int iter=0;iter<iters && failed.get() == false;iter++) { |
| checkSort(dir, new OfflineSorter(dir, "foo_" + threadID + "_" + iter), generateRandom(1024)); |
| } |
| } catch (Throwable th) { |
| failed.set(true); |
| throw new RuntimeException(th); |
| } |
| } |
| }; |
| threads[i].start(); |
| } |
| for(Thread thread : threads) { |
| thread.join(); |
| } |
| } |
| |
| assertFalse(failed.get()); |
| } |
| |
| /** Make sure corruption on the incoming (unsorted) file is caught, even if the corruption didn't confuse OfflineSorter! */ |
| public void testBitFlippedOnInput1() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = new FilterDirectory(dir0) { |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (prefix.equals("unsorted")) { |
| return new CorruptingIndexOutput(dir0, 22, out); |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed(10*1024)); |
| |
| CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> { |
| new OfflineSorter(dir, "foo").sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** Make sure corruption on the incoming (unsorted) file is caught, if the corruption did confuse OfflineSorter! */ |
| public void testBitFlippedOnInput2() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = new FilterDirectory(dir0) { |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (prefix.equals("unsorted")) { |
| return new CorruptingIndexOutput(dir0, 22, out) { |
| @Override |
| protected void corruptFile() throws IOException { |
| String newTempName; |
| try(IndexOutput tmpOut = dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT); |
| IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) { |
| newTempName = tmpOut.getName(); |
| // Replace length at the end with a too-long value: |
| short v = in.readShort(); |
| assertEquals(256, v); |
| tmpOut.writeShort(Short.MAX_VALUE); |
| tmpOut.copyBytes(in, in.length()-Short.BYTES); |
| } |
| |
| // Delete original and copy corrupt version back: |
| dir0.deleteFile(out.getName()); |
| dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT); |
| dir0.deleteFile(newTempName); |
| } |
| }; |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed(5*1024)); |
| |
| // This corruption made OfflineSorter fail with its own exception, but we verify and throw a CorruptIndexException |
| // instead when checksums don't match. |
| CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> { |
| new OfflineSorter(dir, "foo").sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** Make sure corruption on a temp file (partition) is caught, even if the corruption didn't confuse OfflineSorter! */ |
| public void testBitFlippedOnPartition1() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = new FilterDirectory(dir0) { |
| |
| boolean corrupted; |
| |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (corrupted == false && suffix.equals("sort")) { |
| corrupted = true; |
| return new CorruptingIndexOutput(dir0, 544677, out); |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); |
| |
| CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> { |
| new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| /** Make sure corruption on a temp file (partition) is caught, if the corruption did confuse OfflineSorter! */ |
| public void testBitFlippedOnPartition2() throws Exception { |
| |
| try (Directory dir0 = newMockDirectory()) { |
| |
| Directory dir = new FilterDirectory(dir0) { |
| |
| boolean corrupted; |
| |
| @Override |
| public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException { |
| IndexOutput out = in.createTempOutput(prefix, suffix, context); |
| if (corrupted == false && suffix.equals("sort")) { |
| corrupted = true; |
| return new CorruptingIndexOutput(dir0, 544677, out) { |
| @Override |
| protected void corruptFile() throws IOException { |
| String newTempName; |
| try(IndexOutput tmpOut = dir0.createTempOutput("tmp", "tmp", IOContext.DEFAULT); |
| IndexInput in = dir0.openInput(out.getName(), IOContext.DEFAULT)) { |
| newTempName = tmpOut.getName(); |
| tmpOut.copyBytes(in, 1025905); |
| short v = in.readShort(); |
| assertEquals(254, v); |
| tmpOut.writeShort(Short.MAX_VALUE); |
| tmpOut.copyBytes(in, in.length()-1025905-Short.BYTES); |
| } |
| |
| // Delete original and copy corrupt version back: |
| dir0.deleteFile(out.getName()); |
| dir0.copyFrom(dir0, newTempName, out.getName(), IOContext.DEFAULT); |
| dir0.deleteFile(newTempName); |
| } |
| }; |
| } else { |
| return out; |
| } |
| } |
| }; |
| |
| IndexOutput unsorted = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| writeAll(unsorted, generateFixed((int) (OfflineSorter.MB * 3))); |
| |
| CorruptIndexException e = expectThrows(CorruptIndexException.class, () -> { |
| new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(1), 10, -1, null, 0).sort(unsorted.getName()); |
| }); |
| assertTrue(e.getMessage().contains("checksum failed (hardware problem?)")); |
| } |
| } |
| |
| @Nightly |
| public void testFixedLengthHeap() throws Exception { |
| // Make sure the RAM accounting is correct, i.e. if we are sorting fixed width |
| // ints (4 bytes) then the heap used is really only 4 bytes per value: |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| for (int i=0;i<1024*1024;i++) { |
| random().nextBytes(bytes); |
| w.write(bytes); |
| } |
| CodecUtil.writeFooter(out); |
| } |
| |
| ExecutorService exec = randomExecutorServiceOrNull(); |
| OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, exec, TestUtil.nextInt(random(), 1, 4)); |
| sorter.sort(out.getName()); |
| if (exec != null) { |
| exec.shutdownNow(); |
| } |
| // 1 MB of ints with 4 MH heap allowed should have been sorted in a single heap partition: |
| assertEquals(0, sorter.sortInfo.mergeRounds); |
| dir.close(); |
| } |
| |
| public void testFixedLengthLiesLiesLies() throws Exception { |
| // Make sure OfflineSorter catches me if I lie about the fixed value length: |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| random().nextBytes(bytes); |
| w.write(bytes); |
| CodecUtil.writeFooter(out); |
| } |
| |
| OfflineSorter sorter = new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Long.BYTES, null, 0); |
| IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> { |
| sorter.sort(out.getName()); |
| }); |
| assertEquals("value length is 4 but is supposed to always be 8", e.getMessage()); |
| dir.close(); |
| } |
| |
| // OfflineSorter should not call my BytesSequencesReader.next() again after it already returned null: |
| public void testOverNexting() throws Exception { |
| Directory dir = newDirectory(); |
| IndexOutput out = dir.createTempOutput("unsorted", "tmp", IOContext.DEFAULT); |
| try (ByteSequencesWriter w = new OfflineSorter.ByteSequencesWriter(out)) { |
| byte[] bytes = new byte[Integer.BYTES]; |
| random().nextBytes(bytes); |
| w.write(bytes); |
| CodecUtil.writeFooter(out); |
| } |
| |
| new OfflineSorter(dir, "foo", OfflineSorter.DEFAULT_COMPARATOR, BufferSize.megabytes(4), OfflineSorter.MAX_TEMPFILES, Integer.BYTES, null, 0) { |
| @Override |
| protected ByteSequencesReader getReader(ChecksumIndexInput in, String name) throws IOException { |
| ByteSequencesReader other = super.getReader(in, name); |
| |
| return new ByteSequencesReader(in, name) { |
| |
| private boolean alreadyEnded; |
| |
| @Override |
| public BytesRef next() throws IOException { |
| // if we returned null already, OfflineSorter should not call next() again |
| assertFalse(alreadyEnded); |
| BytesRef result = other.next(); |
| if (result == null) { |
| alreadyEnded = true; |
| } |
| return result; |
| } |
| |
| @Override |
| public void close() throws IOException { |
| other.close(); |
| } |
| }; |
| } |
| }.sort(out.getName()); |
| dir.close(); |
| } |
| |
| public void testInvalidFixedLength() throws Exception { |
| IllegalArgumentException e; |
| e = expectThrows(IllegalArgumentException.class, |
| () -> { |
| new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, 0, null, 0); |
| }); |
| assertEquals("valueLength must be 1 .. 32767; got: 0", e.getMessage()); |
| e = expectThrows(IllegalArgumentException.class, |
| () -> { |
| new OfflineSorter(null, "foo", OfflineSorter.DEFAULT_COMPARATOR, |
| BufferSize.megabytes(1), OfflineSorter.MAX_TEMPFILES, Integer.MAX_VALUE, null, 0); |
| }); |
| assertEquals("valueLength must be 1 .. 32767; got: 2147483647", e.getMessage()); |
| } |
| } |