blob: 92e306572bf031613e88373125e9cd40ef570ba4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.NonWritableChannelException;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.beam.runners.dataflow.internal.IsmFormat;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecord;
import org.apache.beam.runners.dataflow.internal.IsmFormat.IsmRecordCoder;
import org.apache.beam.runners.dataflow.internal.IsmFormat.MetadataKeyCoder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.util.RandomAccessData;
import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState;
import org.apache.beam.runners.dataflow.worker.IsmReaderImpl.CachedTailSeekableByteChannel;
import org.apache.beam.runners.dataflow.worker.IsmReaderImpl.IsmShardKey;
import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope;
import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ExecutorTestUtils.TestReaderObserver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink.SinkWriter;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.CoderPropertiesTest.NonDeterministicCoder;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WeightedValue;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.Ints;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.primitives.UnsignedBytes;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link IsmReader}. */
@RunWith(JUnit4.class)
public class IsmReaderTest {
private static final long BLOOM_FILTER_SIZE_LIMIT = 10_000;
private static final int TEST_BLOCK_SIZE = 1024;
private static final IsmRecordCoder<byte[]> CODER =
IsmRecordCoder.of(
1, // number or shard key coders for value records
1, // number of shard key coders for metadata records
ImmutableList.<Coder<?>>of(MetadataKeyCoder.of(ByteArrayCoder.of()), ByteArrayCoder.of()),
ByteArrayCoder.of());
private static final Coder<String> NON_DETERMINISTIC_CODER = new NonDeterministicCoder();
private static final byte[] EMPTY = new byte[0];
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException expectedException = ExpectedException.none();
private Cache<
IsmReaderImpl.IsmShardKey,
WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<byte[]>>>>>
cache;
private DataflowExecutionContext executionContext;
private DataflowOperationContext operationContext;
private SideInputReadCounter sideInputReadCounter;
private Closeable stateCloseable;
@Before
public void setUp() {
cache =
CacheBuilder.newBuilder()
.weigher(Weighers.fixedWeightKeys(1))
.maximumWeight(10_000)
.build();
executionContext =
BatchModeExecutionContext.forTesting(
PipelineOptionsFactory.as(DataflowPipelineOptions.class), "testStage");
DataflowExecutionState state =
executionContext
.getExecutionStateRegistry()
.getState(
NameContextsForTests.nameContextForTest(), "test", null, NoopProfileScope.NOOP);
operationContext =
executionContext.createOperationContext(NameContextsForTests.nameContextForTest());
stateCloseable = executionContext.getExecutionStateTracker().enterState(state);
sideInputReadCounter = new DataflowSideInputReadCounter(executionContext, operationContext, 1);
}
@After
public void tearDown() throws IOException {
stateCloseable.close();
}
@Test
public void testReadEmpty() throws Exception {
writeElementsToFileAndReadInOrder(Collections.<IsmRecord<byte[]>>emptyList());
}
@Test
public void testUsingNonDeterministicShardKeyCoder() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("is expected to be deterministic");
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFolder.newFile().getPath()).resourceId(),
IsmRecordCoder.of(
1, // number or shard key coders for value records
0, // number of shard key coders for metadata records
ImmutableList.<Coder<?>>of(NON_DETERMINISTIC_CODER, ByteArrayCoder.of()),
ByteArrayCoder.of()),
cache);
}
@Test
public void testUsingNonDeterministicNonShardKeyCoder() throws Exception {
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("is expected to be deterministic");
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFolder.newFile().getPath()).resourceId(),
IsmRecordCoder.of(
1, // number or shard key coders for value records
0, // number of shard key coders for metadata records
ImmutableList.<Coder<?>>of(ByteArrayCoder.of(), NON_DETERMINISTIC_CODER),
ByteArrayCoder.of()),
cache);
}
@Test
public void testIsEmpty() throws Exception {
File tmpFile = tmpFolder.newFile();
List<IsmRecord<byte[]>> data = new ArrayList<>();
writeElementsToFile(data, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<byte[]>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
assertFalse(reader.isInitialized());
assertTrue(reader.isEmpty());
assertTrue(reader.isInitialized());
}
@Test
public void testRead() throws Exception {
Random random = new Random(23498321490L);
for (int i : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) {
int minElements = (int) Math.pow(2, i);
int valueSize = 128;
// Generates between 2^i and 2^(i + 1) elements.
writeElementsToFileAndReadInOrder(
dataGenerator(
8 /* number of primary keys */,
minElements + random.nextInt(minElements) /* number of secondary keys */,
8 /* max key size */,
valueSize));
}
}
@Test
public void testReadThatProducesIndexEntries() throws Exception {
Random random = new Random(23498323891L);
int minElements = (int) Math.pow(2, 6);
int valueSize = 128;
// Since we are generating more than 2 blocks worth of data, we are guaranteed that
// at least one index entry is generated per shard.
checkState(minElements * valueSize > 2 * TEST_BLOCK_SIZE);
writeElementsToFileAndReadInOrder(
dataGenerator(
8 /* number of primary keys */,
minElements + random.nextInt(minElements) /* number of secondary keys */,
8 /* max key size */,
valueSize /* max value size */));
}
@Test
public void testReadRandomOrder() throws Exception {
Random random = new Random(2348238943L);
for (int i : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)) {
int minElements = (int) Math.pow(2, i);
int valueSize = 128;
// Generates between 2^i and 2^(i + 1) elements.
writeElementsToFileAndReadInRandomOrder(
dataGenerator(
7 /* number of primary keys */,
minElements + random.nextInt(minElements) /* number of secondary keys */,
8 /* max key size */,
valueSize /* max value size */));
}
}
@Test
public void testGetLastWithPrefix() throws Exception {
Random random = new Random(2348238943L);
for (int i : Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) {
int minElements = (int) Math.pow(2, i);
int valueSize = 128;
// Generates between 2^i and 2^(i + 1) elements.
writeElementsToFileAndFindLastElementPerPrimaryKey(
dataGenerator(
7,
minElements + random.nextInt(minElements),
8 /* max key size */,
valueSize /* max value size */));
}
}
@Test
public void testReadMissingKeys() throws Exception {
File tmpFile = tmpFolder.newFile();
List<IsmRecord<byte[]>> data = new ArrayList<>();
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x04}), EMPTY));
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x08}), EMPTY));
writeElementsToFile(data, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<byte[]>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
// Check that we got false with a key before all keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x02})).start());
// Check that we got false with a key between two other keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x06})).start());
// Check that we got false with a key that is after all keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x10})).start());
}
@Test
public void testReadMissingKeysBypassingBloomFilter() throws Exception {
File tmpFile = tmpFolder.newFile();
List<IsmRecord<byte[]>> data = new ArrayList<>();
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x04}), EMPTY));
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x08}), EMPTY));
writeElementsToFile(data, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<byte[]>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache) {
// We use this override to get around the Bloom filter saying that the key doesn't exist.
@Override
boolean bloomFilterMightContain(RandomAccessData keyBytes) {
return true;
}
};
// Check that we got false with a key before all keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x02})).start());
// Check that we got false with a key between two other keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x06})).start());
// Check that we got false with a key that is after all keys contained in the file.
assertFalse(reader.overKeyComponents(ImmutableList.of(EMPTY, new byte[] {0x10})).start());
}
@Test
public void testReadKeyThatEncodesToEmptyByteArray() throws Exception {
File tmpFile = tmpFolder.newFile();
IsmRecordCoder<Void> coder =
IsmRecordCoder.of(1, 0, ImmutableList.<Coder<?>>of(VoidCoder.of()), VoidCoder.of());
IsmSink<Void> sink =
new IsmSink<>(
FileSystems.matchNewResource(tmpFile.getPath(), false), coder, BLOOM_FILTER_SIZE_LIMIT);
IsmRecord<Void> element = IsmRecord.of(Arrays.asList((Void) null), (Void) null);
try (SinkWriter<WindowedValue<IsmRecord<Void>>> writer = sink.writer()) {
writer.add(new ValueInEmptyWindows<>(element));
}
Cache<
IsmShardKey,
WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<Void>>>>>
cache =
CacheBuilder.newBuilder()
.weigher(Weighers.fixedWeightKeys(1))
.maximumWeight(10_000)
.build();
IsmReader<Void> reader =
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), coder, cache);
IsmReader<Void>.IsmPrefixReaderIterator iterator = reader.iterator();
assertTrue(iterator.start());
assertEquals(
coder.structuralValue(element), coder.structuralValue(iterator.getCurrent().getValue()));
}
@Test
public void testInitializationForSmallFilesIsCached() throws Exception {
File tmpFile = tmpFolder.newFile();
IsmShardKey expectedShardKey =
new IsmShardKey(tmpFile.getAbsolutePath(), new RandomAccessData(0), 0, 13);
List<IsmRecord<byte[]>> data = new ArrayList<>();
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x04}), new byte[] {0x04}));
data.add(IsmRecord.<byte[]>of(ImmutableList.of(EMPTY, new byte[] {0x08}), new byte[] {0x08}));
writeElementsToFile(data, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<byte[]>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
// Validate that reader and cache are in initial state
assertFalse(reader.isInitialized());
assertEquals(0, cache.size());
// Force initialization
reader.overKeyComponents(ImmutableList.of());
// Validate reader is initialized and expected entry is cached
assertTrue(reader.isInitialized());
WeightedValue<NavigableMap<RandomAccessData, WindowedValue<IsmRecord<byte[]>>>> block =
cache.getIfPresent(expectedShardKey);
assertNotNull(block);
assertArrayEquals(
new byte[] {0x04}, block.getValue().firstEntry().getValue().getValue().getValue());
assertArrayEquals(
new byte[] {0x08}, block.getValue().lastEntry().getValue().getValue().getValue());
}
@Test
public void testInitializationForLargeFilesIsNotCached() throws Exception {
File tmpFile = tmpFolder.newFile();
List<IsmRecord<byte[]>> data = new ArrayList<>();
// Use enough data records which are smaller than the cache limit to exceed the 1 MB
// footer read buffer used by IsmReader to optimize small file handling.
for (int i = 0; i < IsmReaderImpl.MAX_SHARD_INDEX_AND_FOOTER_SIZE / TEST_BLOCK_SIZE + 1; ++i) {
data.add(
IsmRecord.<byte[]>of(
ImmutableList.of(EMPTY, Ints.toByteArray(i)), new byte[TEST_BLOCK_SIZE]));
}
writeElementsToFile(data, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<byte[]>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
assertFalse(reader.isInitialized());
assertEquals(0, cache.size());
reader.overKeyComponents(ImmutableList.of());
assertTrue(reader.isInitialized());
assertEquals(0, cache.size());
}
/** Write input elements to the specified file. */
static void writeElementsToFile(Iterable<IsmRecord<byte[]>> elements, File tmpFile)
throws Exception {
IsmSink<byte[]> sink =
new IsmSink<byte[]>(
FileSystems.matchNewResource(tmpFile.getPath(), false),
CODER,
BLOOM_FILTER_SIZE_LIMIT) {
@Override
long getBlockSize() {
return TEST_BLOCK_SIZE;
}
};
try (SinkWriter<WindowedValue<IsmRecord<byte[]>>> writer = sink.writer()) {
for (IsmRecord<byte[]> element : elements) {
writer.add(new ValueInEmptyWindows<>(element));
}
}
}
/**
* Writes elements to an Ism file using an IsmSink. Then reads them back with an IsmReader,
* verifying the values read match those that were written.
*/
private void writeElementsToFileAndReadInOrder(Iterable<IsmRecord<byte[]>> elements)
throws Exception {
File tmpFile = tmpFolder.newFile();
writeElementsToFile(elements, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
assertFalse(reader.isInitialized());
TestReaderObserver observer = new TestReaderObserver(reader);
reader.addObserver(observer);
Iterator<IsmRecord<byte[]>> elementsIterator = elements.iterator();
try (NativeReader.NativeReaderIterator<WindowedValue<IsmRecord<byte[]>>> iterator =
reader.iterator()) {
boolean more = iterator.start();
assertTrue(reader.isInitialized());
for (; more; more = iterator.advance()) {
if (!elementsIterator.hasNext()) {
break;
}
IsmRecord<byte[]> expected = elementsIterator.next();
IsmRecord<byte[]> actual = iterator.getCurrent().getValue();
assertIsmEquals(actual, expected);
final int expectedLength;
if (IsmFormat.isMetadataKey(expected.getKeyComponents())) {
expectedLength = expected.getMetadata().length;
} else {
expectedLength = expected.getValue().length;
}
// Verify that the observer saw at least as many bytes as the size of the value.
assertTrue(
expectedLength <= observer.getActualSizes().get(observer.getActualSizes().size() - 1));
}
if (iterator.advance()) {
fail("Read more elements then expected, did not expect: " + iterator.getCurrent());
} else if (elementsIterator.hasNext()) {
fail("Read less elements then expected, expected: " + elementsIterator.next());
}
// Verify that we see a {@link NoSuchElementException} if we attempt to go further.
try {
iterator.getCurrent();
fail("Expected a NoSuchElementException to have been thrown.");
} catch (NoSuchElementException expected) {
}
}
}
private static void assertIsmEquals(IsmRecord<byte[]> actual, IsmRecord<byte[]> expected) {
assertEquals(expected.getKeyComponents().size(), actual.getKeyComponents().size());
for (int i = 0; i < expected.getKeyComponents().size(); ++i) {
if (actual.getKeyComponent(i) != expected.getKeyComponent(i)) {
assertArrayEquals((byte[]) actual.getKeyComponent(i), (byte[]) expected.getKeyComponent(i));
}
}
if (IsmFormat.isMetadataKey(expected.getKeyComponents())) {
assertArrayEquals(actual.getMetadata(), expected.getMetadata());
} else {
assertArrayEquals(actual.getValue(), expected.getValue());
}
}
/**
* A predicate which filters elements on whether the second key's last byte is odd or even. Allows
* for a stable partitioning of generated data.
*/
private static class EvenFilter implements Predicate<IsmRecord<byte[]>> {
private static final EvenFilter INSTANCE = new EvenFilter();
@Override
public boolean apply(IsmRecord<byte[]> input) {
byte[] secondKey = (byte[]) input.getKeyComponent(1);
return secondKey[secondKey.length - 1] % 2 == 0;
}
}
/**
* Writes elements to an Ism file using an IsmSink. Then reads them back with an IsmReader using a
* random order.
*/
private void writeElementsToFileAndReadInRandomOrder(Iterable<IsmRecord<byte[]>> elements)
throws Exception {
File tmpFile = tmpFolder.newFile();
List<IsmRecord<byte[]>> oddSecondaryKeys =
new ArrayList<>(
ImmutableList.copyOf(Iterables.filter(elements, Predicates.not(EvenFilter.INSTANCE))));
List<IsmRecord<byte[]>> evenSecondaryKeys =
new ArrayList<>(ImmutableList.copyOf(Iterables.filter(elements, EvenFilter.INSTANCE)));
writeElementsToFile(oddSecondaryKeys, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
// Test using next() for a within shard Ism prefix reader iterator
Collections.shuffle(oddSecondaryKeys);
for (IsmRecord<byte[]> expectedNext : oddSecondaryKeys) {
IsmReader<byte[]>.IsmPrefixReaderIterator iterator =
reader.overKeyComponents(expectedNext.getKeyComponents());
assertTrue(iterator.start());
assertIsmEquals(iterator.getCurrent().getValue(), expectedNext);
}
Collections.shuffle(oddSecondaryKeys);
// Test using get() for a shard aware Ism prefix reader
IsmReader<byte[]>.IsmPrefixReaderIterator readerIterator =
reader.overKeyComponents(ImmutableList.of());
for (IsmRecord<byte[]> expectedNext : oddSecondaryKeys) {
assertIsmEquals(readerIterator.get(expectedNext.getKeyComponents()).getValue(), expectedNext);
}
// Test using next() for a within shard Ism prefix reader iterator
Collections.shuffle(evenSecondaryKeys);
for (IsmRecord<byte[]> missingNext : evenSecondaryKeys) {
assertFalse(reader.overKeyComponents(missingNext.getKeyComponents()).start());
}
Collections.shuffle(evenSecondaryKeys);
// Test using get() for a shard aware Ism prefix reader
readerIterator = reader.overKeyComponents(ImmutableList.of());
for (IsmRecord<byte[]> missingNext : evenSecondaryKeys) {
assertNull(readerIterator.get(missingNext.getKeyComponents()));
}
}
private void writeElementsToFileAndFindLastElementPerPrimaryKey(
Iterable<IsmRecord<byte[]>> elements) throws Exception {
File tmpFile = tmpFolder.newFile();
Iterable<IsmRecord<byte[]>> oddValues =
Iterables.filter(elements, Predicates.not(EvenFilter.INSTANCE));
Iterable<IsmRecord<byte[]>> evenValues = Iterables.filter(elements, EvenFilter.INSTANCE);
writeElementsToFile(oddValues, tmpFile);
IsmReader<byte[]> reader =
new IsmReaderImpl<>(
FileSystems.matchSingleFileSpec(tmpFile.getAbsolutePath()).resourceId(), CODER, cache);
SortedMap<byte[], NavigableSet<IsmRecord<byte[]>>> sortedBySecondKey =
new TreeMap<>(UnsignedBytes.lexicographicalComparator());
for (IsmRecord<byte[]> element : oddValues) {
byte[] encodedPrimaryKey =
CoderUtils.encodeToByteArray(CODER.getKeyComponentCoder(0), element.getKeyComponent(0));
if (!sortedBySecondKey.containsKey(encodedPrimaryKey)) {
sortedBySecondKey.put(
encodedPrimaryKey, new TreeSet<>(new IsmRecordKeyComparator<>(CODER)));
}
sortedBySecondKey.get(encodedPrimaryKey).add(element);
}
// The returned value should have the element as a prefix of itself.
for (IsmRecord<byte[]> element : oddValues) {
byte[] encodedPrimaryKey =
CoderUtils.encodeToByteArray(CODER.getKeyComponentCoder(0), element.getKeyComponent(0));
assertIsmEquals(
reader
.overKeyComponents(ImmutableList.of(element.getKeyComponent(0)))
.getLast()
.getValue(),
sortedBySecondKey.get(encodedPrimaryKey).last());
}
// The returned value should always have the element as a prefix of itself or not exist.
for (IsmRecord<byte[]> element : evenValues) {
byte[] encodedPrimaryKey =
CoderUtils.encodeToByteArray(CODER.getKeyComponentCoder(0), element.getKeyComponent(0));
IsmReader<byte[]>.IsmPrefixReaderIterator readerIterator =
reader.overKeyComponents(ImmutableList.of(element.getKeyComponent(0)));
WindowedValue<IsmRecord<byte[]>> lastWindowedValue = readerIterator.getLast();
if (lastWindowedValue != null) {
assertIsmEquals(
lastWindowedValue.getValue(), sortedBySecondKey.get(encodedPrimaryKey).last());
}
}
}
static class IsmRecordKeyComparator<V> implements Comparator<IsmRecord<V>> {
private final IsmRecordCoder<V> coder;
IsmRecordKeyComparator(IsmRecordCoder<V> coder) {
this.coder = coder;
}
@Override
public int compare(IsmRecord<V> first, IsmRecord<V> second) {
RandomAccessData firstKeyBytes = new RandomAccessData();
coder.encodeAndHash(first.getKeyComponents(), firstKeyBytes);
RandomAccessData secondKeyBytes = new RandomAccessData();
coder.encodeAndHash(second.getKeyComponents(), secondKeyBytes);
return RandomAccessData.UNSIGNED_LEXICOGRAPHICAL_COMPARATOR.compare(
firstKeyBytes, secondKeyBytes);
}
}
/**
* Specifies the minimum key size so that we can produce a random byte array with enough of a
* prefix to be able to create successively larger secondary keys.
*/
private static final int MIN_KEY_SIZE = 4;
/** Specifies the percentage of keys that are metadata records when using the data generator. */
private static final double PERCENT_METADATA_RECORDS = 0.01;
/** Creates a map from Ism shard to a sorted set of IsmRecords. */
private Map<Integer, SortedSet<IsmRecord<byte[]>>> dataGeneratorPerShard(
final int numberOfPrimaryKeys,
final int minNumberOfSecondaryKeys,
final int maxKeySize,
final int maxValueSize) {
checkState(maxKeySize >= MIN_KEY_SIZE);
final Random random = new Random(minNumberOfSecondaryKeys);
Map<Integer, SortedSet<IsmRecord<byte[]>>> shardToRecordMap = new HashMap<>();
while (shardToRecordMap.keySet().size() < numberOfPrimaryKeys) {
// Generate the next primary key
byte[] primaryKey = new byte[random.nextInt(maxKeySize - MIN_KEY_SIZE) + MIN_KEY_SIZE];
random.nextBytes(primaryKey);
int shardId = CODER.hash(ImmutableList.of(primaryKey));
// Add a sorted set for the shard id if this shard id has never been generated before.
if (!shardToRecordMap.containsKey(shardId)) {
shardToRecordMap.put(
shardId, new TreeSet<IsmRecord<byte[]>>(new IsmRecordKeyComparator<byte[]>(CODER)));
}
// Generate the requested number of secondary keys using the newly generated primary key.
byte[] secondaryKey = new byte[maxKeySize];
for (int j = 0; j < minNumberOfSecondaryKeys; ++j) {
secondaryKey = generateNextSecondaryKey(random, maxKeySize, secondaryKey);
// Generate the value bytes.
byte[] value = new byte[random.nextInt(maxValueSize)];
random.nextBytes(value);
// 1% of keys are metadata records
if (random.nextFloat() < PERCENT_METADATA_RECORDS) {
IsmRecord<byte[]> ismRecord =
IsmRecord.meta(ImmutableList.of(IsmFormat.getMetadataKey(), secondaryKey), value);
int metadataShardId = CODER.hash(ismRecord.getKeyComponents());
// Add a sorted set for the shard id if this shard id has never been generated before.
if (!shardToRecordMap.containsKey(metadataShardId)) {
shardToRecordMap.put(
metadataShardId,
new TreeSet<IsmRecord<byte[]>>(new IsmRecordKeyComparator<byte[]>(CODER)));
}
shardToRecordMap.get(metadataShardId).add(ismRecord);
} else {
IsmRecord<byte[]> ismRecord =
IsmRecord.<byte[]>of(ImmutableList.of(primaryKey, secondaryKey), value);
shardToRecordMap.get(shardId).add(ismRecord);
}
}
}
return shardToRecordMap;
}
private byte[] generateNextSecondaryKey(
Random random, int maxKeySize, byte[] previousSecondaryKey) {
byte[] currentSecondaryKey = new byte[random.nextInt(maxKeySize - MIN_KEY_SIZE) + MIN_KEY_SIZE];
int matchingPrefix =
Math.min(
currentSecondaryKey.length, random.nextInt(maxKeySize - MIN_KEY_SIZE) + MIN_KEY_SIZE);
byte[] randomSuffix = new byte[currentSecondaryKey.length - matchingPrefix];
random.nextBytes(randomSuffix);
System.arraycopy(
previousSecondaryKey,
0,
currentSecondaryKey,
0,
Math.min(currentSecondaryKey.length, previousSecondaryKey.length));
System.arraycopy(randomSuffix, 0, currentSecondaryKey, matchingPrefix, randomSuffix.length);
matchingPrefix -= 1;
// Find the first byte which is less than 255 at the end of the matching portion.
while ((currentSecondaryKey[matchingPrefix] & 0xFF) == 0xFF) {
currentSecondaryKey[matchingPrefix] = 0;
matchingPrefix -= 1;
}
// Increment the last byte of the matching prefix to make sure this key is
// larger than the previous key.
currentSecondaryKey[matchingPrefix] = (byte) ((currentSecondaryKey[matchingPrefix] & 0xFF) + 1);
return currentSecondaryKey;
}
/** Creates an iterable of IsmRecords grouped by shard id, and in ascending order per shard. */
private Iterable<IsmRecord<byte[]>> dataGenerator(
final int numberOfPrimaryKeys,
final int numberOfSecondaryKeys,
final int approximateKeySize,
final int maxValueSize) {
FluentIterable<IsmRecord<byte[]>> records =
FluentIterable.from(
dataGeneratorPerShard(
numberOfPrimaryKeys, numberOfSecondaryKeys,
approximateKeySize, maxValueSize)
.entrySet())
.transformAndConcat(Map.Entry::getValue);
return records;
}
@Test
public void testCachedTailSeekableByteChannelThrowsOnTruncate() throws Exception {
try (SeekableByteChannel channel = new CachedTailSeekableByteChannel(0, new byte[0])) {
expectedException.expect(NonWritableChannelException.class);
channel.truncate(0);
}
}
@Test
public void testCachedTailSeekableByteChannelThrowsOnWrite() throws Exception {
try (SeekableByteChannel channel = new CachedTailSeekableByteChannel(0, new byte[0])) {
expectedException.expect(NonWritableChannelException.class);
channel.write(ByteBuffer.wrap(new byte[0]));
}
}
@Test
public void testCachedTailSeekableByteChannelRead() throws Exception {
final int offset = 10;
try (SeekableByteChannel channel =
new CachedTailSeekableByteChannel(offset, new byte[] {0, 1, 2})) {
ByteBuffer buffer = ByteBuffer.allocate(1);
channel.position(offset);
assertEquals(1, channel.read(buffer));
assertEquals(0, buffer.get(0));
assertEquals(offset + 1, channel.position());
buffer.clear();
assertEquals(1, channel.read(buffer));
assertEquals(1, buffer.get(0));
assertEquals(offset + 2, channel.position());
buffer.clear();
assertEquals(1, channel.read(buffer));
assertEquals(2, buffer.get(0));
assertEquals(offset + 3, channel.position());
buffer.clear();
// Reposition the stream and do a read
channel.position(offset + 1);
assertEquals(1, channel.read(buffer));
assertEquals(1, buffer.get(0));
assertEquals(offset + 2, channel.position());
buffer.clear();
assertEquals(1, channel.read(buffer));
assertEquals(2, buffer.get(0));
assertEquals(offset + 3, channel.position());
buffer.clear();
// This read is expected to return EOF
assertEquals(-1, channel.read(buffer));
buffer.clear();
// Reposition the stream to EOF and do a read, expected to return EOF
channel.position(offset + 3);
assertEquals(-1, channel.read(buffer));
buffer.clear();
}
}
@Test
public void testCachedTailSeekableByteChannelSeekBeforeBounds() throws Exception {
try (SeekableByteChannel channel = new CachedTailSeekableByteChannel(1, new byte[0])) {
// Seek to only valid position
channel.position(1);
expectedException.expect(IllegalArgumentException.class);
channel.position(0);
}
}
@Test
public void testCachedTailSeekableByteChannelSeekBeyondBounds() throws Exception {
try (SeekableByteChannel channel = new CachedTailSeekableByteChannel(1, new byte[0])) {
// Seek to only valid position
channel.position(1);
expectedException.expect(IllegalArgumentException.class);
channel.position(2);
}
}
}