| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.Arrays; |
| import java.util.Random; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| |
| import org.junit.AfterClass; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| |
| import static org.junit.Assert.assertArrayEquals; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| /** |
| * This class tests the DFS positional read functionality on a single node |
| * mini-cluster. These tests are inspired from {@link TestPread}. The tests |
| * are much less comprehensive than other pread tests because pread already |
| * internally uses {@link ByteBuffer}s. |
| */ |
| public class TestByteBufferPread { |
| |
| private static MiniDFSCluster cluster; |
| private static FileSystem fs; |
| private static byte[] fileContents; |
| private static Path testFile; |
| private static Random rand; |
| |
| private static final long SEED = 0xDEADBEEFL; |
| private static final int BLOCK_SIZE = 4096; |
| private static final int FILE_SIZE = 12 * BLOCK_SIZE; |
| |
| @BeforeClass |
| public static void setup() throws IOException { |
| // Setup the cluster with a small block size so we can create small files |
| // that span multiple blocks |
| Configuration conf = new Configuration(); |
| conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); |
| cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); |
| fs = cluster.getFileSystem(); |
| |
| // Create a test file that spans 12 blocks, and contains a bunch of random |
| // bytes |
| fileContents = new byte[FILE_SIZE]; |
| rand = new Random(SEED); |
| rand.nextBytes(fileContents); |
| testFile = new Path("/byte-buffer-pread-test.dat"); |
| try (FSDataOutputStream out = fs.create(testFile, (short) 3)) { |
| out.write(fileContents); |
| } |
| } |
| |
| /** |
| * Test preads with {@link java.nio.HeapByteBuffer}s. |
| */ |
| @Test |
| public void testPreadWithHeapByteBuffer() throws IOException { |
| testPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); |
| testPreadWithFullByteBuffer(ByteBuffer.allocate(FILE_SIZE)); |
| testPreadWithPositionedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); |
| testPreadWithLimitedByteBuffer(ByteBuffer.allocate(FILE_SIZE)); |
| testPositionedPreadWithByteBuffer(ByteBuffer.allocate(FILE_SIZE)); |
| } |
| |
| /** |
| * Test preads with {@link java.nio.DirectByteBuffer}s. |
| */ |
| @Test |
| public void testPreadWithDirectByteBuffer() throws IOException { |
| testPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); |
| testPreadWithFullByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); |
| testPreadWithPositionedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); |
| testPreadWithLimitedByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); |
| testPositionedPreadWithByteBuffer(ByteBuffer.allocateDirect(FILE_SIZE)); |
| } |
| |
| /** |
| * Reads the entire testFile using the pread API and validates that its |
| * contents are properly loaded into the supplied {@link ByteBuffer}. |
| */ |
| private void testPreadWithByteBuffer(ByteBuffer buffer) throws IOException { |
| int bytesRead; |
| int totalBytesRead = 0; |
| try (FSDataInputStream in = fs.open(testFile)) { |
| while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { |
| totalBytesRead += bytesRead; |
| // Check that each call to read changes the position of the ByteBuffer |
| // correctly |
| assertEquals(totalBytesRead, buffer.position()); |
| } |
| |
| // Make sure the buffer is full |
| assertFalse(buffer.hasRemaining()); |
| // Make sure the contents of the read buffer equal the contents of the |
| // file |
| buffer.position(0); |
| byte[] bufferContents = new byte[FILE_SIZE]; |
| buffer.get(bufferContents); |
| assertArrayEquals(bufferContents, fileContents); |
| buffer.position(buffer.limit()); |
| } |
| } |
| |
| /** |
| * Attempts to read the testFile into a {@link ByteBuffer} that is already |
| * full, and validates that doing so does not change the contents of the |
| * supplied {@link ByteBuffer}. |
| */ |
| private void testPreadWithFullByteBuffer(ByteBuffer buffer) |
| throws IOException { |
| // Load some dummy data into the buffer |
| byte[] existingBufferBytes = new byte[FILE_SIZE]; |
| rand.nextBytes(existingBufferBytes); |
| buffer.put(existingBufferBytes); |
| // Make sure the buffer is full |
| assertFalse(buffer.hasRemaining()); |
| |
| try (FSDataInputStream in = fs.open(testFile)) { |
| // Attempt to read into the buffer, 0 bytes should be read since the |
| // buffer is full |
| assertEquals(0, in.read(buffer)); |
| |
| // Double check the buffer is still full and its contents have not |
| // changed |
| assertFalse(buffer.hasRemaining()); |
| buffer.position(0); |
| byte[] bufferContents = new byte[FILE_SIZE]; |
| buffer.get(bufferContents); |
| assertArrayEquals(bufferContents, existingBufferBytes); |
| } |
| } |
| |
| /** |
| * Reads half of the testFile into the {@link ByteBuffer} by setting a |
| * {@link ByteBuffer#limit} on the buffer. Validates that only half of the |
| * testFile is loaded into the buffer. |
| */ |
| private void testPreadWithLimitedByteBuffer( |
| ByteBuffer buffer) throws IOException { |
| int bytesRead; |
| int totalBytesRead = 0; |
| // Set the buffer limit to half the size of the file |
| buffer.limit(FILE_SIZE / 2); |
| |
| try (FSDataInputStream in = fs.open(testFile)) { |
| while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { |
| totalBytesRead += bytesRead; |
| // Check that each call to read changes the position of the ByteBuffer |
| // correctly |
| assertEquals(totalBytesRead, buffer.position()); |
| } |
| |
| // Since we set the buffer limit to half the size of the file, we should |
| // have only read half of the file into the buffer |
| assertEquals(totalBytesRead, FILE_SIZE / 2); |
| // Check that the buffer is full and the contents equal the first half of |
| // the file |
| assertFalse(buffer.hasRemaining()); |
| buffer.position(0); |
| byte[] bufferContents = new byte[FILE_SIZE / 2]; |
| buffer.get(bufferContents); |
| assertArrayEquals(bufferContents, |
| Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); |
| } |
| } |
| |
| /** |
| * Reads half of the testFile into the {@link ByteBuffer} by setting the |
| * {@link ByteBuffer#position} the half the size of the file. Validates that |
| * only half of the testFile is loaded into the buffer. |
| */ |
| private void testPreadWithPositionedByteBuffer( |
| ByteBuffer buffer) throws IOException { |
| int bytesRead; |
| int totalBytesRead = 0; |
| // Set the buffer position to half the size of the file |
| buffer.position(FILE_SIZE / 2); |
| |
| try (FSDataInputStream in = fs.open(testFile)) { |
| while ((bytesRead = in.read(totalBytesRead, buffer)) > 0) { |
| totalBytesRead += bytesRead; |
| // Check that each call to read changes the position of the ByteBuffer |
| // correctly |
| assertEquals(totalBytesRead + FILE_SIZE / 2, buffer.position()); |
| } |
| |
| // Since we set the buffer position to half the size of the file, we |
| // should have only read half of the file into the buffer |
| assertEquals(totalBytesRead, FILE_SIZE / 2); |
| // Check that the buffer is full and the contents equal the first half of |
| // the file |
| assertFalse(buffer.hasRemaining()); |
| buffer.position(FILE_SIZE / 2); |
| byte[] bufferContents = new byte[FILE_SIZE / 2]; |
| buffer.get(bufferContents); |
| assertArrayEquals(bufferContents, |
| Arrays.copyOfRange(fileContents, 0, FILE_SIZE / 2)); |
| } |
| } |
| |
| /** |
| * Reads half of the testFile into the {@link ByteBuffer} by specifying a |
| * position for the pread API that is half of the file size. Validates that |
| * only half of the testFile is loaded into the buffer. |
| */ |
| private void testPositionedPreadWithByteBuffer( |
| ByteBuffer buffer) throws IOException { |
| int bytesRead; |
| int totalBytesRead = 0; |
| |
| try (FSDataInputStream in = fs.open(testFile)) { |
| // Start reading from halfway through the file |
| while ((bytesRead = in.read(totalBytesRead + FILE_SIZE / 2, |
| buffer)) > 0) { |
| totalBytesRead += bytesRead; |
| // Check that each call to read changes the position of the ByteBuffer |
| // correctly |
| assertEquals(totalBytesRead, buffer.position()); |
| } |
| |
| // Since we starting reading halfway through the file, the buffer should |
| // only be half full |
| assertEquals(totalBytesRead, FILE_SIZE / 2); |
| assertEquals(buffer.position(), FILE_SIZE / 2); |
| assertTrue(buffer.hasRemaining()); |
| // Check that the buffer contents equal the second half of the file |
| buffer.position(0); |
| byte[] bufferContents = new byte[FILE_SIZE / 2]; |
| buffer.get(bufferContents); |
| assertArrayEquals(bufferContents, |
| Arrays.copyOfRange(fileContents, FILE_SIZE / 2, FILE_SIZE)); |
| } |
| } |
| |
| @AfterClass |
| public static void shutdown() throws IOException { |
| try { |
| fs.delete(testFile, false); |
| fs.close(); |
| } finally { |
| cluster.shutdown(true); |
| } |
| } |
| } |