blob: b514febcbd63e4ee320a851e7c3fb7d0f9747b2c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.util;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.hadoop.TestUtils;
import org.apache.parquet.io.SeekableInputStream;
import org.junit.Assert;
import org.junit.Test;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import static org.apache.parquet.hadoop.util.HadoopStreams.wrap;
import static org.apache.parquet.hadoop.util.MockHadoopInputStream.TEST_ARRAY;
public class TestHadoop2ByteBufferReads {
/**
* This mimics ByteBuffer reads from streams in Hadoop 2
*/
private static class MockBufferReader implements H2SeekableInputStream.Reader {
private final FSDataInputStream stream;
public MockBufferReader(FSDataInputStream stream) {
this.stream = stream;
}
@Override
public int read(ByteBuffer buf) throws IOException {
// this is inefficient, but simple for correctness tests of
// readFully(ByteBuffer)
byte[] temp = new byte[buf.remaining()];
int bytesRead = stream.read(temp, 0, temp.length);
if (bytesRead > 0) {
buf.put(temp, 0, bytesRead);
}
return bytesRead;
}
}
@Test
public void testHeapReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(8);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(8, readBuffer.position());
Assert.assertEquals(8, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(8, readBuffer.position());
Assert.assertEquals(8, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
}
@Test
public void testHeapReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocate(20);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
EOFException.class, () -> {
H2SeekableInputStream.readFully(reader, readBuffer);
return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
// several read operations that will read up to the end of the input. This
// is a correct value because the bytes in the buffer are valid. This
// behavior can't be implemented for the heap buffer without using the read
// method instead of the readFully method on the underlying
// FSDataInputStream.
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(20, readBuffer.limit());
}
@Test
public void testHeapReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
// trying to read 0 more bytes doesn't result in EOFException
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testHeapReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testHeapReadFullyPosition() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
readBuffer.position(3);
readBuffer.mark();
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
@Test
public void testHeapReadFullyLimit() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
readBuffer.limit(7);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
readBuffer.position(7);
readBuffer.limit(10);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testHeapReadFullyPositionAndLimit() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(10);
readBuffer.position(3);
readBuffer.limit(7);
readBuffer.mark();
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
readBuffer.position(7);
readBuffer.limit(10);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
@Test
public void testDirectReadFullySmallBuffer() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(8);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(8, readBuffer.position());
Assert.assertEquals(8, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(8, readBuffer.position());
Assert.assertEquals(8, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 8), readBuffer);
}
@Test
public void testDirectReadFullyLargeBuffer() throws Exception {
final ByteBuffer readBuffer = ByteBuffer.allocateDirect(20);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
final MockBufferReader reader = new MockBufferReader(hadoopStream);
TestUtils.assertThrows("Should throw EOFException",
EOFException.class, () -> {
H2SeekableInputStream.readFully(reader, readBuffer);
return null;
});
// NOTE: This behavior differs from readFullyHeapBuffer because direct uses
// several read operations that will read up to the end of the input. This
// is a correct value because the bytes in the buffer are valid. This
// behavior can't be implemented for the heap buffer without using the read
// method instead of the readFully method on the underlying
// FSDataInputStream.
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(20, readBuffer.limit());
}
@Test
public void testDirectReadFullyJustRight() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream());
MockBufferReader reader = new MockBufferReader(hadoopStream);
// reads all of the bytes available without EOFException
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
// trying to read 0 more bytes doesn't result in EOFException
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testDirectReadFullySmallReads() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testDirectReadFullyPosition() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.position(3);
readBuffer.mark();
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
@Test
public void testDirectReadFullyLimit() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.limit(7);
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
H2SeekableInputStream.Reader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
readBuffer.position(7);
readBuffer.limit(10);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.flip();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY), readBuffer);
}
@Test
public void testDirectReadFullyPositionAndLimit() throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocateDirect(10);
readBuffer.position(3);
readBuffer.limit(7);
readBuffer.mark();
FSDataInputStream hadoopStream = new FSDataInputStream(new MockHadoopInputStream(2, 3, 3));
MockBufferReader reader = new MockBufferReader(hadoopStream);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(7, readBuffer.position());
Assert.assertEquals(7, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 4), readBuffer);
readBuffer.position(7);
readBuffer.limit(10);
H2SeekableInputStream.readFully(reader, readBuffer);
Assert.assertEquals(10, readBuffer.position());
Assert.assertEquals(10, readBuffer.limit());
readBuffer.reset();
Assert.assertEquals("Buffer contents should match",
ByteBuffer.wrap(TEST_ARRAY, 0, 7), readBuffer);
}
@Test
public void testCreateStreamNoByteBufferReadable() {
final SeekableInputStream s = wrap(new FSDataInputStream(
new MockHadoopInputStream()));
Assert.assertTrue("Wrong wrapper: " + s,
s instanceof H1SeekableInputStream);
}
@Test
public void testDoubleWrapNoByteBufferReadable() {
final SeekableInputStream s = wrap(new FSDataInputStream(
new FSDataInputStream(new MockHadoopInputStream())));
Assert.assertTrue("Wrong wrapper: " + s,
s instanceof H1SeekableInputStream);
}
@Test
public void testCreateStreamWithByteBufferReadable() {
final SeekableInputStream s = wrap(new FSDataInputStream(
new MockByteBufferInputStream()));
Assert.assertTrue("Wrong wrapper: " + s,
s instanceof H2SeekableInputStream);
}
@Test
public void testDoubleWrapByteBufferReadable() {
final SeekableInputStream s = wrap(new FSDataInputStream(
new FSDataInputStream(new MockByteBufferInputStream())));
Assert.assertTrue("Wrong wrapper: " + s,
s instanceof H2SeekableInputStream);
}
/**
* Input stream which claims to implement ByteBufferReadable.
*/
private static final class MockByteBufferInputStream
extends MockHadoopInputStream implements ByteBufferReadable {
@Override
public int read(final ByteBuffer buf) {
return 0;
}
}
}