blob: 4e0bce3bd23448504e8d5ab1736a2dda97a7a060 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PushbackInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Random;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.AvroDefault;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.AvroSource.AvroMetadata;
import org.apache.beam.sdk.io.AvroSource.AvroReader;
import org.apache.beam.sdk.io.AvroSource.AvroReader.Seeker;
import org.apache.beam.sdk.io.BlockBasedSource.BlockBasedReader;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for AvroSource. */
@RunWith(JUnit4.class)
public class AvroSourceTest {
@Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule public ExpectedException expectedException = ExpectedException.none();
private enum SyncBehavior {
SYNC_REGULAR, // Sync at regular, user defined intervals
SYNC_RANDOM, // Sync at random intervals
SYNC_DEFAULT // Sync at default intervals (i.e., no manual syncing).
}
private static final int DEFAULT_RECORD_COUNT = 1000;
/**
* Generates an input Avro file containing the given records in the temporary directory and
* returns the full path of the file.
*/
private <T> String generateTestFile(
String filename,
List<T> elems,
SyncBehavior syncBehavior,
int syncInterval,
AvroCoder<T> coder,
String codec)
throws IOException {
Random random = new Random(0);
File tmpFile = tmpFolder.newFile(filename);
String path = tmpFile.toString();
FileOutputStream os = new FileOutputStream(tmpFile);
DatumWriter<T> datumWriter =
coder.getType().equals(GenericRecord.class)
? new GenericDatumWriter<>(coder.getSchema())
: new ReflectDatumWriter<>(coder.getSchema());
try (DataFileWriter<T> writer = new DataFileWriter<>(datumWriter)) {
writer.setCodec(CodecFactory.fromString(codec));
writer.create(coder.getSchema(), os);
int recordIndex = 0;
int syncIndex = syncBehavior == SyncBehavior.SYNC_RANDOM ? random.nextInt(syncInterval) : 0;
for (T elem : elems) {
writer.append(elem);
recordIndex++;
switch (syncBehavior) {
case SYNC_REGULAR:
if (recordIndex == syncInterval) {
recordIndex = 0;
writer.sync();
}
break;
case SYNC_RANDOM:
if (recordIndex == syncIndex) {
recordIndex = 0;
writer.sync();
syncIndex = random.nextInt(syncInterval);
}
break;
case SYNC_DEFAULT:
default:
}
}
}
return path;
}
@Test
public void testReadWithDifferentCodecs() throws Exception {
// Test reading files generated using all codecs.
String[] codecs = {
DataFileConstants.NULL_CODEC,
DataFileConstants.BZIP2_CODEC,
DataFileConstants.DEFLATE_CODEC,
DataFileConstants.SNAPPY_CODEC,
DataFileConstants.XZ_CODEC,
};
// As Avro's default block size is 64KB, write 64K records to ensure at least one full block.
// We could make this smaller than 64KB assuming each record is at least B bytes, but then the
// test could silently stop testing the failure condition from BEAM-422.
List<Bird> expected = createRandomRecords(1 << 16);
for (String codec : codecs) {
String filename =
generateTestFile(
codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
AvroSource<Bird> source = AvroSource.from(filename).withSchema(Bird.class);
List<Bird> actual = SourceTestUtils.readFromSource(source, null);
assertThat(expected, containsInAnyOrder(actual.toArray()));
}
}
@Test
public void testSplitAtFraction() throws Exception {
// A reduced dataset is enough here.
List<FixedRecord> expected = createFixedRecords(DEFAULT_RECORD_COUNT);
// Create an AvroSource where each block is 1/10th of the total set of records.
String filename =
generateTestFile(
"tmp.avro",
expected,
SyncBehavior.SYNC_REGULAR,
DEFAULT_RECORD_COUNT / 10 /* max records per block */,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
File file = new File(filename);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
List<? extends BoundedSource<FixedRecord>> splits = source.split(file.length() / 3, null);
for (BoundedSource<FixedRecord> subSource : splits) {
int items = SourceTestUtils.readFromSource(subSource, null).size();
// Shouldn't split while unstarted.
SourceTestUtils.assertSplitAtFractionFails(subSource, 0, 0.0, null);
SourceTestUtils.assertSplitAtFractionFails(subSource, 0, 0.7, null);
SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(subSource, 1, 0.7, null);
SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(
subSource, DEFAULT_RECORD_COUNT / 100, 0.7, null);
SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(
subSource, DEFAULT_RECORD_COUNT / 10, 0.1, null);
SourceTestUtils.assertSplitAtFractionFails(
subSource, DEFAULT_RECORD_COUNT / 10 + 1, 0.1, null);
SourceTestUtils.assertSplitAtFractionFails(subSource, DEFAULT_RECORD_COUNT / 3, 0.3, null);
SourceTestUtils.assertSplitAtFractionFails(subSource, items, 0.9, null);
SourceTestUtils.assertSplitAtFractionFails(subSource, items, 1.0, null);
SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent(subSource, items, 0.999, null);
}
}
@Test
public void testGetProgressFromUnstartedReader() throws Exception {
List<FixedRecord> records = createFixedRecords(DEFAULT_RECORD_COUNT);
String filename =
generateTestFile(
"tmp.avro",
records,
SyncBehavior.SYNC_DEFAULT,
1000,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
File file = new File(filename);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
try (BoundedSource.BoundedReader<FixedRecord> reader = source.createReader(null)) {
assertEquals(Double.valueOf(0.0), reader.getFractionConsumed());
}
List<? extends BoundedSource<FixedRecord>> splits = source.split(file.length() / 3, null);
for (BoundedSource<FixedRecord> subSource : splits) {
try (BoundedSource.BoundedReader<FixedRecord> reader = subSource.createReader(null)) {
assertEquals(Double.valueOf(0.0), reader.getFractionConsumed());
}
}
}
@Test
public void testProgress() throws Exception {
// 5 records, 2 per block.
List<FixedRecord> records = createFixedRecords(5);
String filename =
generateTestFile(
"tmp.avro",
records,
SyncBehavior.SYNC_REGULAR,
2,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
// Before starting
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// First 2 records are in the same block.
assertTrue(reader.start());
assertTrue(reader.isAtSplitPoint());
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// continued
assertTrue(reader.advance());
assertFalse(reader.isAtSplitPoint());
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// Second block -> parallelism consumed becomes 1.
assertTrue(reader.advance());
assertTrue(reader.isAtSplitPoint());
assertEquals(1, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// continued
assertTrue(reader.advance());
assertFalse(reader.isAtSplitPoint());
assertEquals(1, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// Third and final block -> parallelism consumed becomes 2, remaining becomes 1.
assertTrue(reader.advance());
assertTrue(reader.isAtSplitPoint());
assertEquals(2, reader.getSplitPointsConsumed());
assertEquals(1, reader.getSplitPointsRemaining());
// Done
assertFalse(reader.advance());
assertEquals(3, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
}
}
@Test
public void testProgressEmptySource() throws Exception {
// 0 records, 20 per block.
List<FixedRecord> records = Collections.emptyList();
String filename =
generateTestFile(
"tmp.avro",
records,
SyncBehavior.SYNC_REGULAR,
2,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
try (BoundedSource.BoundedReader<FixedRecord> readerOrig = source.createReader(null)) {
assertThat(readerOrig, Matchers.instanceOf(BlockBasedReader.class));
BlockBasedReader<FixedRecord> reader = (BlockBasedReader<FixedRecord>) readerOrig;
// before starting
assertEquals(0.0, reader.getFractionConsumed(), 1e-6);
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(BoundedReader.SPLIT_POINTS_UNKNOWN, reader.getSplitPointsRemaining());
// confirm empty
assertFalse(reader.start());
// after reading empty source
assertEquals(0, reader.getSplitPointsConsumed());
assertEquals(0, reader.getSplitPointsRemaining());
assertEquals(1.0, reader.getFractionConsumed(), 1e-6);
}
}
@Test
public void testGetCurrentFromUnstartedReader() throws Exception {
List<FixedRecord> records = createFixedRecords(DEFAULT_RECORD_COUNT);
String filename =
generateTestFile(
"tmp.avro",
records,
SyncBehavior.SYNC_DEFAULT,
1000,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
try (BlockBasedSource.BlockBasedReader<FixedRecord> reader =
(BlockBasedSource.BlockBasedReader<FixedRecord>) source.createReader(null)) {
assertEquals(null, reader.getCurrentBlock());
expectedException.expect(NoSuchElementException.class);
expectedException.expectMessage("No block has been successfully read from");
reader.getCurrent();
}
}
@Test
public void testSplitAtFractionExhaustive() throws Exception {
// A small-sized input is sufficient, because the test verifies that splitting is non-vacuous.
List<FixedRecord> expected = createFixedRecords(20);
String filename =
generateTestFile(
"tmp.avro",
expected,
SyncBehavior.SYNC_REGULAR,
5,
AvroCoder.of(FixedRecord.class),
DataFileConstants.NULL_CODEC);
AvroSource<FixedRecord> source = AvroSource.from(filename).withSchema(FixedRecord.class);
SourceTestUtils.assertSplitAtFractionExhaustive(source, null);
}
@Test
public void testSplitsWithSmallBlocks() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
// Test reading from an object file with many small random-sized blocks.
// The file itself doesn't have to be big; we can use a decreased record count.
List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
String filename =
generateTestFile(
"tmp.avro",
expected,
SyncBehavior.SYNC_RANDOM,
DEFAULT_RECORD_COUNT / 20 /* max records/block */,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
File file = new File(filename);
// Small minimum bundle size
AvroSource<Bird> source =
AvroSource.from(filename).withSchema(Bird.class).withMinBundleSize(100L);
// Assert that the source produces the expected records
assertEquals(expected, SourceTestUtils.readFromSource(source, options));
List<? extends BoundedSource<Bird>> splits;
int nonEmptySplits;
// Split with the minimum bundle size
splits = source.split(100L, options);
assertTrue(splits.size() > 2);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
nonEmptySplits = 0;
for (BoundedSource<Bird> subSource : splits) {
if (SourceTestUtils.readFromSource(subSource, options).size() > 0) {
nonEmptySplits += 1;
}
}
assertTrue(nonEmptySplits > 2);
// Split with larger bundle size
splits = source.split(file.length() / 4, options);
assertTrue(splits.size() > 2);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
nonEmptySplits = 0;
for (BoundedSource<Bird> subSource : splits) {
if (SourceTestUtils.readFromSource(subSource, options).size() > 0) {
nonEmptySplits += 1;
}
}
assertTrue(nonEmptySplits > 2);
// Split with the file length
splits = source.split(file.length(), options);
assertTrue(splits.size() == 1);
SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
}
@Test
public void testMultipleFiles() throws Exception {
String baseName = "tmp-";
List<Bird> expected = new ArrayList<>();
for (int i = 0; i < 10; i++) {
List<Bird> contents = createRandomRecords(DEFAULT_RECORD_COUNT / 10);
expected.addAll(contents);
generateTestFile(
baseName + i,
contents,
SyncBehavior.SYNC_DEFAULT,
0,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
}
AvroSource<Bird> source =
AvroSource.from(new File(tmpFolder.getRoot().toString(), baseName + "*").toString())
.withSchema(Bird.class);
List<Bird> actual = SourceTestUtils.readFromSource(source, null);
assertThat(actual, containsInAnyOrder(expected.toArray()));
}
@Test
public void testCreationWithSchema() throws Exception {
List<Bird> expected = createRandomRecords(100);
String filename =
generateTestFile(
"tmp.avro",
expected,
SyncBehavior.SYNC_DEFAULT,
0,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
// Create a source with a schema object
Schema schema = ReflectData.get().getSchema(Bird.class);
AvroSource<GenericRecord> source = AvroSource.from(filename).withSchema(schema);
List<GenericRecord> records = SourceTestUtils.readFromSource(source, null);
assertEqualsWithGeneric(expected, records);
// Create a source with a JSON schema
String schemaString = ReflectData.get().getSchema(Bird.class).toString();
source = AvroSource.from(filename).withSchema(schemaString);
records = SourceTestUtils.readFromSource(source, null);
assertEqualsWithGeneric(expected, records);
}
@Test
public void testSchemaUpdate() throws Exception {
List<Bird> birds = createRandomRecords(100);
String filename =
generateTestFile(
"tmp.avro",
birds,
SyncBehavior.SYNC_DEFAULT,
0,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
AvroSource<FancyBird> source = AvroSource.from(filename).withSchema(FancyBird.class);
List<FancyBird> actual = SourceTestUtils.readFromSource(source, null);
List<FancyBird> expected = new ArrayList<>();
for (Bird bird : birds) {
expected.add(
new FancyBird(
bird.number, bird.species, bird.quality, bird.quantity, null, "MAXIMUM OVERDRIVE"));
}
assertThat(actual, containsInAnyOrder(expected.toArray()));
}
@Test
public void testSchemaStringIsInterned() throws Exception {
List<Bird> birds = createRandomRecords(100);
String filename =
generateTestFile(
"tmp.avro",
birds,
SyncBehavior.SYNC_DEFAULT,
0,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
Metadata fileMetadata = FileSystems.matchSingleFileSpec(filename);
String schema = AvroSource.readMetadataFromFile(fileMetadata.resourceId()).getSchemaString();
// Add "" to the schema to make sure it is not interned.
AvroSource<GenericRecord> sourceA = AvroSource.from(filename).withSchema("" + schema);
AvroSource<GenericRecord> sourceB = AvroSource.from(filename).withSchema("" + schema);
assertSame(sourceA.getReaderSchemaString(), sourceB.getReaderSchemaString());
// Ensure that deserialization still goes through interning
AvroSource<GenericRecord> sourceC = SerializableUtils.clone(sourceB);
assertSame(sourceA.getReaderSchemaString(), sourceC.getReaderSchemaString());
}
@Test
public void testParseFn() throws Exception {
List<Bird> expected = createRandomRecords(100);
String filename =
generateTestFile(
"tmp.avro",
expected,
SyncBehavior.SYNC_DEFAULT,
0,
AvroCoder.of(Bird.class),
DataFileConstants.NULL_CODEC);
AvroSource<Bird> source =
AvroSource.from(filename)
.withParseFn(
input ->
new Bird(
(long) input.get("number"),
input.get("species").toString(),
input.get("quality").toString(),
(long) input.get("quantity")),
AvroCoder.of(Bird.class));
List<Bird> actual = SourceTestUtils.readFromSource(source, null);
assertThat(actual, containsInAnyOrder(expected.toArray()));
}
private void assertEqualsWithGeneric(List<Bird> expected, List<GenericRecord> actual) {
assertEquals(expected.size(), actual.size());
for (int i = 0; i < expected.size(); i++) {
Bird fixed = expected.get(i);
GenericRecord generic = actual.get(i);
assertEquals(fixed.number, generic.get("number"));
assertEquals(fixed.quality, generic.get("quality").toString()); // From Avro util.Utf8
assertEquals(fixed.quantity, generic.get("quantity"));
assertEquals(fixed.species, generic.get("species").toString());
}
}
/**
* Creates a haystack byte array of the give size with a needle that starts at the given position.
*/
private byte[] createHaystack(byte[] needle, int position, int size) {
byte[] haystack = new byte[size];
for (int i = position, j = 0; i < size && j < needle.length; i++, j++) {
haystack[i] = needle[j];
}
return haystack;
}
/**
* Asserts that advancePastNextSyncMarker advances an input stream past a sync marker and
* correctly returns the number of bytes consumed from the stream. Creates a haystack of size
* bytes and places a 16-byte sync marker at the position specified.
*/
private void testAdvancePastNextSyncMarkerAt(int position, int size) throws IOException {
byte sentinel = (byte) 0xFF;
byte[] marker = new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 6};
byte[] haystack = createHaystack(marker, position, size);
PushbackInputStream stream =
new PushbackInputStream(new ByteArrayInputStream(haystack), marker.length);
if (position + marker.length < size) {
haystack[position + marker.length] = sentinel;
assertEquals(position + marker.length, AvroReader.advancePastNextSyncMarker(stream, marker));
assertEquals(sentinel, (byte) stream.read());
} else {
assertEquals(size, AvroReader.advancePastNextSyncMarker(stream, marker));
assertEquals(-1, stream.read());
}
}
@Test
public void testAdvancePastNextSyncMarker() throws IOException {
// Test placing the sync marker at different locations at the start and in the middle of the
// buffer.
for (int i = 0; i <= 16; i++) {
testAdvancePastNextSyncMarkerAt(i, 1000);
testAdvancePastNextSyncMarkerAt(160 + i, 1000);
}
// Test placing the sync marker at the end of the buffer.
testAdvancePastNextSyncMarkerAt(983, 1000);
// Test placing the sync marker so that it begins at the end of the buffer.
testAdvancePastNextSyncMarkerAt(984, 1000);
testAdvancePastNextSyncMarkerAt(985, 1000);
testAdvancePastNextSyncMarkerAt(999, 1000);
// Test with no sync marker.
testAdvancePastNextSyncMarkerAt(1000, 1000);
}
// Tests for Seeker.
@Test
public void testSeekerFind() {
byte[] marker = {0, 1, 2, 3};
byte[] buffer;
Seeker s;
s = new Seeker(marker);
buffer = new byte[] {0, 1, 2, 3, 4, 5, 6, 7};
assertEquals(3, s.find(buffer, buffer.length));
buffer = new byte[] {0, 0, 0, 0, 0, 1, 2, 3};
assertEquals(7, s.find(buffer, buffer.length));
buffer = new byte[] {0, 1, 2, 0, 0, 1, 2, 3};
assertEquals(7, s.find(buffer, buffer.length));
buffer = new byte[] {0, 1, 2, 3};
assertEquals(3, s.find(buffer, buffer.length));
}
@Test
public void testSeekerFindResume() {
byte[] marker = {0, 1, 2, 3};
byte[] buffer;
Seeker s;
s = new Seeker(marker);
buffer = new byte[] {0, 0, 0, 0, 0, 0, 0, 0};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {1, 2, 3, 0, 0, 0, 0, 0};
assertEquals(2, s.find(buffer, buffer.length));
buffer = new byte[] {0, 0, 0, 0, 0, 0, 1, 2};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {3, 0, 1, 2, 3, 0, 1, 2};
assertEquals(0, s.find(buffer, buffer.length));
buffer = new byte[] {0};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {1};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {2};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {3};
assertEquals(0, s.find(buffer, buffer.length));
}
@Test
public void testSeekerUsesBufferLength() {
byte[] marker = {0, 0, 1};
byte[] buffer;
Seeker s;
s = new Seeker(marker);
buffer = new byte[] {0, 0, 0, 1};
assertEquals(-1, s.find(buffer, 3));
s = new Seeker(marker);
buffer = new byte[] {0, 0};
assertEquals(-1, s.find(buffer, 1));
buffer = new byte[] {1, 0};
assertEquals(-1, s.find(buffer, 1));
s = new Seeker(marker);
buffer = new byte[] {0, 2};
assertEquals(-1, s.find(buffer, 1));
buffer = new byte[] {0, 2};
assertEquals(-1, s.find(buffer, 1));
buffer = new byte[] {1, 2};
assertEquals(0, s.find(buffer, 1));
}
@Test
public void testSeekerFindPartial() {
byte[] marker = {0, 0, 1};
byte[] buffer;
Seeker s;
s = new Seeker(marker);
buffer = new byte[] {0, 0, 0, 1};
assertEquals(3, s.find(buffer, buffer.length));
marker = new byte[] {1, 1, 1, 2};
s = new Seeker(marker);
buffer = new byte[] {1, 1, 1, 1, 1};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {1, 1, 2};
assertEquals(2, s.find(buffer, buffer.length));
buffer = new byte[] {1, 1, 1, 1, 1};
assertEquals(-1, s.find(buffer, buffer.length));
buffer = new byte[] {2, 1, 1, 1, 2};
assertEquals(0, s.find(buffer, buffer.length));
}
@Test
public void testSeekerFindAllLocations() {
byte[] marker = {1, 1, 2};
byte[] allOnes = new byte[] {1, 1, 1, 1};
byte[] findIn = new byte[] {1, 1, 1, 1};
Seeker s = new Seeker(marker);
for (int i = 0; i < findIn.length; i++) {
assertEquals(-1, s.find(allOnes, allOnes.length));
findIn[i] = 2;
assertEquals(i, s.find(findIn, findIn.length));
findIn[i] = 1;
}
}
@Test
public void testDisplayData() {
AvroSource<Bird> source =
AvroSource.from("foobar.txt").withSchema(Bird.class).withMinBundleSize(1234);
DisplayData displayData = DisplayData.from(source);
assertThat(displayData, hasDisplayItem("filePattern", "foobar.txt"));
assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
}
@Test
public void testReadMetadataWithCodecs() throws Exception {
// Test reading files generated using all codecs.
String[] codecs = {
DataFileConstants.NULL_CODEC,
DataFileConstants.BZIP2_CODEC,
DataFileConstants.DEFLATE_CODEC,
DataFileConstants.SNAPPY_CODEC,
DataFileConstants.XZ_CODEC
};
List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
for (String codec : codecs) {
String filename =
generateTestFile(
codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
AvroMetadata metadata = AvroSource.readMetadataFromFile(fileMeta.resourceId());
assertEquals(codec, metadata.getCodec());
}
}
@Test
public void testReadSchemaString() throws Exception {
List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
String codec = DataFileConstants.NULL_CODEC;
String filename =
generateTestFile(
codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
AvroMetadata metadata = AvroSource.readMetadataFromFile(fileMeta.resourceId());
// By default, parse validates the schema, which is what we want.
Schema schema = new Schema.Parser().parse(metadata.getSchemaString());
assertEquals(4, schema.getFields().size());
}
@Test
public void testCreateFromMetadata() throws Exception {
List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
String codec = DataFileConstants.NULL_CODEC;
String filename =
generateTestFile(
codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);
AvroSource<GenericRecord> source = AvroSource.from(fileMeta);
AvroSource<Bird> sourceWithSchema = source.withSchema(Bird.class);
AvroSource<Bird> sourceWithSchemaWithMinBundleSize = sourceWithSchema.withMinBundleSize(1234);
assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, source.getMode());
assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchema.getMode());
assertEquals(
FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchemaWithMinBundleSize.getMode());
}
/**
* Class that will encode to a fixed size: 16 bytes.
*
* <p>Each object has a 15-byte array. Avro encodes an object of this type as a byte array, so
* each encoded object will consist of 1 byte that encodes the length of the array, followed by 15
* bytes.
*/
@DefaultCoder(AvroCoder.class)
public static class FixedRecord {
private byte[] value = new byte[15];
public FixedRecord() {
this(0);
}
public FixedRecord(int i) {
value[0] = (byte) i;
value[1] = (byte) (i >> 8);
value[2] = (byte) (i >> 16);
value[3] = (byte) (i >> 24);
}
public int asInt() {
return value[0] | (value[1] << 8) | (value[2] << 16) | (value[3] << 24);
}
@Override
public boolean equals(Object o) {
if (o instanceof FixedRecord) {
FixedRecord other = (FixedRecord) o;
return this.asInt() == other.asInt();
}
return false;
}
@Override
public int hashCode() {
return toString().hashCode();
}
@Override
public String toString() {
return Integer.toString(this.asInt());
}
}
/** Create a list of count 16-byte records. */
private static List<FixedRecord> createFixedRecords(int count) {
List<FixedRecord> records = new ArrayList<>();
for (int i = 0; i < count; i++) {
records.add(new FixedRecord(i));
}
return records;
}
/** Class used as the record type in tests. */
@DefaultCoder(AvroCoder.class)
static class Bird {
long number;
String species;
String quality;
long quantity;
public Bird() {}
public Bird(long number, String species, String quality, long quantity) {
this.number = number;
this.species = species;
this.quality = quality;
this.quantity = quantity;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(Bird.class)
.addValue(number)
.addValue(species)
.addValue(quantity)
.addValue(quality)
.toString();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Bird) {
Bird other = (Bird) obj;
return Objects.equals(species, other.species)
&& Objects.equals(quality, other.quality)
&& quantity == other.quantity
&& number == other.number;
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(number, species, quality, quantity);
}
}
/**
* Class used as the record type in tests.
*
* <p>Contains nullable fields and fields with default values. Can be read using a file written
* with the Bird schema.
*/
@DefaultCoder(AvroCoder.class)
public static class FancyBird {
long number;
String species;
String quality;
long quantity;
@Nullable String habitat;
@AvroDefault("\"MAXIMUM OVERDRIVE\"")
String fancinessLevel;
public FancyBird() {}
public FancyBird(
long number,
String species,
String quality,
long quantity,
String habitat,
String fancinessLevel) {
this.number = number;
this.species = species;
this.quality = quality;
this.quantity = quantity;
this.habitat = habitat;
this.fancinessLevel = fancinessLevel;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(FancyBird.class)
.addValue(number)
.addValue(species)
.addValue(quality)
.addValue(quantity)
.addValue(habitat)
.addValue(fancinessLevel)
.toString();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof FancyBird) {
FancyBird other = (FancyBird) obj;
return Objects.equals(species, other.species)
&& Objects.equals(quality, other.quality)
&& quantity == other.quantity
&& number == other.number
&& Objects.equals(fancinessLevel, other.fancinessLevel)
&& Objects.equals(habitat, other.habitat);
}
return false;
}
@Override
public int hashCode() {
return Objects.hash(number, species, quality, quantity, habitat, fancinessLevel);
}
}
/** Create a list of n random records. */
private static List<Bird> createRandomRecords(long n) {
String[] qualities = {
"miserable", "forelorn", "fidgity", "squirrelly", "fanciful", "chipper", "lazy"
};
String[] species = {"pigeons", "owls", "gulls", "hawks", "robins", "jays"};
Random random = new Random(0);
List<Bird> records = new ArrayList<>();
for (long i = 0; i < n; i++) {
Bird bird = new Bird();
bird.quality = qualities[random.nextInt(qualities.length)];
bird.species = species[random.nextInt(species.length)];
bird.number = i;
bird.quantity = random.nextLong();
records.add(bird);
}
return records;
}
}