blob: 572be6029fba4a0b71202251559d6c02d96f11f8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.io;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.avro.RandomData;
import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.util.ByteBufferInputStream;
import org.apache.avro.util.ByteBufferOutputStream;
import org.apache.avro.util.Utf8;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class TestBinaryDecoder {
// prime number buffer size so that looping tests hit the buffer edge
// at different points in the loop.
DecoderFactory factory = new DecoderFactory().configureDecoderBufferSize(521);
private boolean useDirect = false;
static EncoderFactory e_factory = EncoderFactory.get();
public TestBinaryDecoder(boolean useDirect) {
this.useDirect = useDirect;
}
@Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ true },
{ false },
});
}
private Decoder newDecoderWithNoData() throws IOException {
return newDecoder(new byte[0]);
}
private Decoder newDecoder(byte[] bytes, int start, int len)
throws IOException {
return factory.binaryDecoder(bytes, start, len, null);
}
private Decoder newDecoder(InputStream in) {
if (useDirect) {
return factory.directBinaryDecoder(in, null);
} else {
return factory.binaryDecoder(in, null);
}
}
private Decoder newDecoder(byte[] bytes) throws IOException {
return factory.binaryDecoder(bytes, null);
}
/** Verify EOFException throw at EOF */
@Test(expected=EOFException.class)
public void testEOFBoolean() throws IOException {
newDecoderWithNoData().readBoolean();
}
@Test(expected=EOFException.class)
public void testEOFInt() throws IOException {
newDecoderWithNoData().readInt();
}
@Test(expected=EOFException.class)
public void testEOFLong() throws IOException {
newDecoderWithNoData().readLong();
}
@Test(expected=EOFException.class)
public void testEOFFloat() throws IOException {
newDecoderWithNoData().readFloat();
}
@Test(expected=EOFException.class)
public void testEOFDouble() throws IOException {
newDecoderWithNoData().readDouble();
}
@Test(expected=EOFException.class)
public void testEOFBytes() throws IOException {
newDecoderWithNoData().readBytes(null);
}
@Test(expected=EOFException.class)
public void testEOFString() throws IOException {
newDecoderWithNoData().readString(new Utf8("a"));
}
@Test(expected=EOFException.class)
public void testEOFFixed() throws IOException {
newDecoderWithNoData().readFixed(new byte[1]);
}
@Test(expected=EOFException.class)
public void testEOFEnum() throws IOException {
newDecoderWithNoData().readEnum();
}
@Test
public void testReuse() throws IOException {
ByteBufferOutputStream bbo1 = new ByteBufferOutputStream();
ByteBufferOutputStream bbo2 = new ByteBufferOutputStream();
byte[] b1 = new byte[] { 1, 2 };
BinaryEncoder e1 = e_factory.binaryEncoder(bbo1, null);
e1.writeBytes(b1);
e1.flush();
BinaryEncoder e2 = e_factory.binaryEncoder(bbo2, null);
e2.writeBytes(b1);
e2.flush();
DirectBinaryDecoder d = new DirectBinaryDecoder(
new ByteBufferInputStream(bbo1.getBufferList()));
ByteBuffer bb1 = d.readBytes(null);
Assert.assertEquals(b1.length, bb1.limit() - bb1.position());
d.configure(new ByteBufferInputStream(bbo2.getBufferList()));
ByteBuffer bb2 = d.readBytes(null);
Assert.assertEquals(b1.length, bb2.limit() - bb2.position());
}
private static byte[] data = null;
private static int seed = -1;
private static Schema schema = null;
private static int count = 200;
private static ArrayList<Object> records = new ArrayList<Object>(count);
@BeforeClass
public static void generateData() throws IOException {
seed = (int)System.currentTimeMillis();
// note some tests (testSkipping) rely on this explicitly
String jsonSchema =
"{\"type\": \"record\", \"name\": \"Test\", \"fields\": ["
+"{\"name\":\"intField\", \"type\":\"int\"},"
+"{\"name\":\"bytesField\", \"type\":\"bytes\"},"
+"{\"name\":\"booleanField\", \"type\":\"boolean\"},"
+"{\"name\":\"stringField\", \"type\":\"string\"},"
+"{\"name\":\"floatField\", \"type\":\"float\"},"
+"{\"name\":\"doubleField\", \"type\":\"double\"},"
+"{\"name\":\"arrayField\", \"type\": " +
"{\"type\":\"array\", \"items\":\"boolean\"}},"
+"{\"name\":\"longField\", \"type\":\"long\"}]}";
schema = Schema.parse(jsonSchema);
GenericDatumWriter<Object> writer = new GenericDatumWriter<Object>();
writer.setSchema(schema);
ByteArrayOutputStream baos = new ByteArrayOutputStream(8192);
BinaryEncoder encoder = e_factory.binaryEncoder(baos, null);
for (Object datum : new RandomData(schema, count, seed)) {
writer.write(datum, encoder);
records.add(datum);
}
encoder.flush();
data = baos.toByteArray();
}
@Test
public void testDecodeFromSources() throws IOException {
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
reader.setSchema(schema);
ByteArrayInputStream is = new ByteArrayInputStream(data);
ByteArrayInputStream is2 = new ByteArrayInputStream(data);
ByteArrayInputStream is3 = new ByteArrayInputStream(data);
Decoder fromInputStream = newDecoder(is);
Decoder fromArray = newDecoder(data);
byte[] data2 = new byte[data.length + 30];
Arrays.fill(data2, (byte)0xff);
System.arraycopy(data, 0, data2, 15, data.length);
Decoder fromOffsetArray = newDecoder(data2, 15, data.length);
BinaryDecoder initOnInputStream = factory.binaryDecoder(
new byte[50], 0, 30, null);
initOnInputStream = factory.binaryDecoder(is2, initOnInputStream);
BinaryDecoder initOnArray = factory.binaryDecoder(is3, null);
initOnArray = factory.binaryDecoder(
data, 0, data.length, initOnArray);
for (Object datum : records) {
Assert.assertEquals(
"InputStream based BinaryDecoder result does not match",
datum, reader.read(null, fromInputStream));
Assert.assertEquals(
"Array based BinaryDecoder result does not match",
datum, reader.read(null, fromArray));
Assert.assertEquals(
"offset Array based BinaryDecoder result does not match",
datum, reader.read(null, fromOffsetArray));
Assert.assertEquals(
"InputStream initialized BinaryDecoder result does not match",
datum, reader.read(null, initOnInputStream));
Assert.assertEquals(
"Array initialized BinaryDecoder result does not match",
datum, reader.read(null, initOnArray));
}
}
@Test
public void testInputStreamProxy() throws IOException {
Decoder d = newDecoder(data);
if (d instanceof BinaryDecoder) {
BinaryDecoder bd = (BinaryDecoder) d;
InputStream test = bd.inputStream();
InputStream check = new ByteArrayInputStream(data);
validateInputStreamReads(test, check);
bd = factory.binaryDecoder(data, bd);
test = bd.inputStream();
check = new ByteArrayInputStream(data);
validateInputStreamSkips(test, check);
// with input stream sources
bd = factory.binaryDecoder(new ByteArrayInputStream(data), bd);
test = bd.inputStream();
check = new ByteArrayInputStream(data);
validateInputStreamReads(test, check);
bd = factory.binaryDecoder(new ByteArrayInputStream(data), bd);
test = bd.inputStream();
check = new ByteArrayInputStream(data);
validateInputStreamSkips(test, check);
}
}
@Test
public void testInputStreamProxyDetached() throws IOException {
Decoder d = newDecoder(data);
if (d instanceof BinaryDecoder) {
BinaryDecoder bd = (BinaryDecoder) d;
InputStream test = bd.inputStream();
InputStream check = new ByteArrayInputStream(data);
// detach input stream and decoder from old source
factory.binaryDecoder(new byte[56], null);
InputStream bad = bd.inputStream();
InputStream check2 = new ByteArrayInputStream(data);
validateInputStreamReads(test, check);
Assert.assertFalse(bad.read() == check2.read());
}
}
@Test
public void testInputStreamPartiallyUsed() throws IOException {
BinaryDecoder bd = factory.binaryDecoder(
new ByteArrayInputStream(data), null);
InputStream test = bd.inputStream();
InputStream check = new ByteArrayInputStream(data);
// triggers buffer fill if unused and tests isEnd()
try {
Assert.assertFalse(bd.isEnd());
} catch (UnsupportedOperationException e) {
// this is ok if its a DirectBinaryDecoder.
if (bd.getClass() != DirectBinaryDecoder.class) {
throw e;
}
}
bd.readFloat(); // use data, and otherwise trigger buffer fill
check.skip(4); // skip the same # of bytes here
validateInputStreamReads(test, check);
}
private void validateInputStreamReads(InputStream test, InputStream check)
throws IOException {
byte[] bt = new byte[7];
byte[] bc = new byte[7];
while (true) {
int t = test.read();
int c = check.read();
Assert.assertEquals(c, t);
if (-1 == t) break;
t = test.read(bt);
c = check.read(bc);
Assert.assertEquals(c, t);
Assert.assertArrayEquals(bt, bc);
if (-1 == t) break;
t = test.read(bt, 1, 4);
c = check.read(bc, 1, 4);
Assert.assertEquals(c, t);
Assert.assertArrayEquals(bt, bc);
if (-1 == t) break;
}
Assert.assertEquals(0, test.skip(5));
Assert.assertEquals(0, test.available());
Assert.assertFalse(test.getClass() != ByteArrayInputStream.class && test.markSupported());
test.close();
}
private void validateInputStreamSkips(InputStream test, InputStream check) throws IOException {
while(true) {
long t2 = test.skip(19);
long c2 = check.skip(19);
Assert.assertEquals(c2, t2);
if (0 == t2) break;
}
Assert.assertEquals(-1, test.read());
}
@Test
public void testBadIntEncoding() throws IOException {
byte[] badint = new byte[5];
Arrays.fill(badint, (byte)0xff);
Decoder bd = factory.binaryDecoder(badint, null);
String message = "";
try {
bd.readInt();
} catch (IOException ioe) {
message = ioe.getMessage();
}
Assert.assertEquals("Invalid int encoding", message);
}
@Test
public void testBadLongEncoding() throws IOException {
byte[] badint = new byte[10];
Arrays.fill(badint, (byte)0xff);
Decoder bd = factory.binaryDecoder(badint, null);
String message = "";
try {
bd.readLong();
} catch (IOException ioe) {
message = ioe.getMessage();
}
Assert.assertEquals("Invalid long encoding", message);
}
@Test
public void testBadLengthEncoding() throws IOException {
byte[] bad = new byte[] { (byte)1 };
Decoder bd = factory.binaryDecoder(bad, null);
String message = "";
try {
bd.readString();
} catch (AvroRuntimeException e) {
message = e.getMessage();
}
Assert.assertEquals("Malformed data. Length is negative: -1", message);
}
@Test(expected=EOFException.class)
public void testIntTooShort() throws IOException {
byte[] badint = new byte[4];
Arrays.fill(badint, (byte)0xff);
newDecoder(badint).readInt();
}
@Test(expected=EOFException.class)
public void testLongTooShort() throws IOException {
byte[] badint = new byte[9];
Arrays.fill(badint, (byte)0xff);
newDecoder(badint).readLong();
}
@Test(expected=EOFException.class)
public void testFloatTooShort() throws IOException {
byte[] badint = new byte[3];
Arrays.fill(badint, (byte)0xff);
newDecoder(badint).readInt();
}
@Test(expected=EOFException.class)
public void testDoubleTooShort() throws IOException {
byte[] badint = new byte[7];
Arrays.fill(badint, (byte)0xff);
newDecoder(badint).readLong();
}
@Test
public void testSkipping() throws IOException {
Decoder d = newDecoder(data);
skipGenerated(d);
if (d instanceof BinaryDecoder) {
BinaryDecoder bd = (BinaryDecoder) d;
try {
Assert.assertTrue(bd.isEnd());
} catch (UnsupportedOperationException e) {
// this is ok if its a DirectBinaryDecoder.
if (bd.getClass() != DirectBinaryDecoder.class) {
throw e;
}
}
bd = factory.binaryDecoder(new ByteArrayInputStream(data), bd);
skipGenerated(bd);
try {
Assert.assertTrue(bd.isEnd());
} catch (UnsupportedOperationException e) {
// this is ok if its a DirectBinaryDecoder.
if (bd.getClass() != DirectBinaryDecoder.class) {
throw e;
}
}
}
}
private void skipGenerated(Decoder bd) throws IOException {
for (int i = 0; i < records.size(); i++) {
bd.readInt();
bd.skipBytes();
bd.skipFixed(1);
bd.skipString();
bd.skipFixed(4);
bd.skipFixed(8);
long leftover = bd.skipArray();
// booleans are one byte, array trailer is one byte
bd.skipFixed((int)leftover + 1);
bd.skipFixed(0);
bd.readLong();
}
EOFException eof = null;
try {
bd.skipFixed(4);
} catch (EOFException e) {
eof = e;
}
Assert.assertTrue(null != eof);
}
@Test(expected = EOFException.class)
public void testEOF() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Encoder e = EncoderFactory.get().binaryEncoder(baos, null);
e.writeLong(0x10000000000000l);
e.flush();
Decoder d = newDecoder(new ByteArrayInputStream(baos.toByteArray()));
Assert.assertEquals(0x10000000000000l, d.readLong());
d.readInt();
}
}