blob: 3e2dd6becf8091afad898987f3ee36ac8b29f721 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
import java.nio.file.Files;
import java.nio.file.Path;
import com.sun.management.UnixOperatingSystemMXBean;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableFileInput;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.junit.Test;
@SuppressWarnings("restriction")
public class TestDataFileReader {
@Test
// regression test for bug AVRO-2286
public void testForLeakingFileDescriptors() throws IOException {
Path emptyFile = Files.createTempFile("empty", ".avro");
Files.deleteIfExists(emptyFile);
Files.createFile(emptyFile);
long openFilesBeforeOperation = getNumberOfOpenFileDescriptors();
try (DataFileReader<Object> reader = new DataFileReader<>(emptyFile.toFile(), new GenericDatumReader<>())) {
fail("Reading on empty file is supposed to fail.");
} catch (IOException e) {
// everything going as supposed to
}
Files.delete(emptyFile);
long openFilesAfterOperation = getNumberOfOpenFileDescriptors();
// Sometimes we have less open files because of a GC run during the test cycle.
assertTrue("File descriptor leaked from new DataFileReader() (expected:" + openFilesBeforeOperation + " actual:"
+ openFilesAfterOperation + ")", openFilesBeforeOperation >= openFilesAfterOperation);
}
private long getNumberOfOpenFileDescriptors() {
OperatingSystemMXBean osMxBean = ManagementFactory.getOperatingSystemMXBean();
if (osMxBean instanceof UnixOperatingSystemMXBean) {
return ((UnixOperatingSystemMXBean) osMxBean).getOpenFileDescriptorCount();
}
return 0;
}
@Test
// regression test for bug AVRO-2944
public void testThrottledInputStream() throws IOException {
// AVRO-2944 describes hanging/failure in reading Avro file with performing
// magic header check. This happens with throttled input stream,
// where we read into buffer less bytes than requested.
Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+ "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
File f = Files.createTempFile("testThrottledInputStream", ".avro").toFile();
try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
w.create(legacySchema, f);
w.flush();
}
// Without checking for magic header, throttled input has no effect
FileReader r = new DataFileReader(throttledInputStream(f), new GenericDatumReader<>());
assertEquals("TestSchema", r.getSchema().getName());
// With checking for magic header, throttled input should pass too.
FileReader r2 = DataFileReader.openReader(throttledInputStream(f), new GenericDatumReader<>());
assertEquals("TestSchema", r2.getSchema().getName());
}
private SeekableInput throttledInputStream(File f) throws IOException {
SeekableFileInput input = new SeekableFileInput(f);
return new SeekableInput() {
@Override
public void close() throws IOException {
input.close();
}
@Override
public void seek(long p) throws IOException {
input.seek(p);
}
@Override
public long tell() throws IOException {
return input.tell();
}
@Override
public long length() throws IOException {
return input.length();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (len == 1) {
return input.read(b, off, len);
} else {
return input.read(b, off, len - 1);
}
}
};
}
@Test(expected = EOFException.class)
// another regression test for bug AVRO-2944, testing EOF case
public void testInputStreamEOF() throws IOException {
// AVRO-2944 describes hanging/failure in reading Avro file with performing
// magic header check. This potentially happens with a defective input stream
// where a -1 value is unexpectedly returned from a read.
Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse("{\"type\": \"record\", \"name\": \"TestSchema\", \"fields\": "
+ "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
File f = Files.createTempFile("testInputStreamEOF", ".avro").toFile();
try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
w.create(legacySchema, f);
w.flush();
}
// Should throw an EOFException
DataFileReader.openReader(eofInputStream(f), new GenericDatumReader<>());
}
private SeekableInput eofInputStream(File f) throws IOException {
SeekableFileInput input = new SeekableFileInput(f);
return new SeekableInput() {
@Override
public void close() throws IOException {
input.close();
}
@Override
public void seek(long p) throws IOException {
input.seek(p);
}
@Override
public long tell() throws IOException {
return input.tell();
}
@Override
public long length() throws IOException {
return input.length();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
};
}
@Test
public void testIgnoreSchemaValidationOnRead() throws IOException {
// This schema has an accent in the name and the default for the field doesn't
// match the first type in the union. A Java SDK in the past could create a file
// containing this schema.
Schema legacySchema = new Schema.Parser().setValidate(false).setValidateDefaults(false)
.parse("{\"type\": \"record\", \"name\": \"InvalidAccëntWithInvalidNull\", \"fields\": "
+ "[ {\"name\": \"id\", \"type\": [\"long\", \"null\"], \"default\": null}]}");
// Create a file with the legacy schema.
File f = Files.createTempFile("testIgnoreSchemaValidationOnRead", ".avro").toFile();
try (DataFileWriter<?> w = new DataFileWriter<>(new GenericDatumWriter<>())) {
w.create(legacySchema, f);
w.flush();
}
// This should not throw an exception.
try (DataFileStream<Void> r = new DataFileStream<>(new FileInputStream(f), new GenericDatumReader<>())) {
assertEquals(legacySchema, r.getSchema());
}
}
@Test(expected = InvalidAvroMagicException.class)
public void testInvalidMagicLength() throws IOException {
File f = Files.createTempFile("testInvalidMagicLength", ".avro").toFile();
try (FileWriter w = new FileWriter(f)) {
w.write("-");
}
DataFileReader.openReader(new SeekableFileInput(f), new GenericDatumReader<>());
}
@Test(expected = InvalidAvroMagicException.class)
public void testInvalidMagicBytes() throws IOException {
File f = Files.createTempFile("testInvalidMagicBytes", ".avro").toFile();
try (FileWriter w = new FileWriter(f)) {
w.write("invalid");
}
DataFileReader.openReader(new SeekableFileInput(f), new GenericDatumReader<>());
}
}