blob: 5c2b6da02abb52dbd562ec38fe07e05ed52c5b70 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.primitives.Ints;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.UUID;
public class TestExternalBlockReader {
private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
private static long SEED = 1234;
@Test
public void testMisconfiguredExternalBlockReader() throws Exception {
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
"org.apache.hadoop.hdfs.NonExistentReplicaAccessorBuilderClass");
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
final int TEST_LENGTH = 2048;
DistributedFileSystem dfs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
FSDataInputStream stream = dfs.open(new Path("/a"));
byte buf[] = new byte[TEST_LENGTH];
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
Assert.assertArrayEquals(expected, buf);
stream.close();
} finally {
dfs.close();
cluster.shutdown();
}
}
private static final String SYNTHETIC_BLOCK_READER_TEST_UUID_KEY =
"synthetic.block.reader.test.uuid.key";
private static final HashMap<String, LinkedList<SyntheticReplicaAccessor>>
accessors = new HashMap<String, LinkedList<SyntheticReplicaAccessor>>(1);
public static class SyntheticReplicaAccessorBuilder
extends ReplicaAccessorBuilder {
String fileName;
long blockId;
String blockPoolId;
long genstamp;
boolean verifyChecksum;
String clientName;
boolean allowShortCircuit;
long visibleLength;
Configuration conf;
@Override
public ReplicaAccessorBuilder setFileName(String fileName) {
this.fileName = fileName;
return this;
}
@Override
public ReplicaAccessorBuilder setBlock(long blockId, String blockPoolId) {
this.blockId = blockId;
this.blockPoolId = blockPoolId;
return this;
}
@Override
public ReplicaAccessorBuilder setGenerationStamp(long genstamp) {
this.genstamp = genstamp;
return this;
}
@Override
public ReplicaAccessorBuilder setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
return this;
}
@Override
public ReplicaAccessorBuilder setClientName(String clientName) {
this.clientName = clientName;
return this;
}
@Override
public ReplicaAccessorBuilder setAllowShortCircuitReads(boolean allowShortCircuit) {
this.allowShortCircuit = allowShortCircuit;
return this;
}
@Override
public ReplicaAccessorBuilder setVisibleLength(long visibleLength) {
this.visibleLength = visibleLength;
return this;
}
@Override
public ReplicaAccessorBuilder setConfiguration(Configuration conf) {
this.conf = conf;
return this;
}
@Override
public ReplicaAccessorBuilder setBlockAccessToken(byte[] token) {
return this;
}
@Override
public ReplicaAccessor build() {
if (visibleLength < 1024) {
LOG.info("SyntheticReplicaAccessorFactory returning null for a " +
"smaller replica with size " + visibleLength); //trace
return null;
}
return new SyntheticReplicaAccessor(this);
}
}
public static class SyntheticReplicaAccessor extends ReplicaAccessor {
final long length;
final byte contents[];
final SyntheticReplicaAccessorBuilder builder;
long totalRead = 0;
int numCloses = 0;
String error = "";
String prefix = "";
final long genstamp;
SyntheticReplicaAccessor(SyntheticReplicaAccessorBuilder builder) {
this.length = builder.visibleLength;
this.contents = DFSTestUtil.
calculateFileContentsFromSeed(SEED, Ints.checkedCast(length));
this.builder = builder;
this.genstamp = builder.genstamp;
String uuid = this.builder.conf.
get(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY);
LinkedList<SyntheticReplicaAccessor> accessorsList =
accessors.get(uuid);
if (accessorsList == null) {
accessorsList = new LinkedList<SyntheticReplicaAccessor>();
}
accessorsList.add(this);
accessors.put(uuid, accessorsList);
}
@Override
public synchronized int read(long pos, byte[] buf, int off, int len)
throws IOException {
if (pos > Integer.MAX_VALUE) {
return 0;
} else if (pos < 0) {
addError("Attempted to read from a location that was less " +
"than 0 at " + pos);
return 0;
}
int i = off, nread = 0, ipos;
for (ipos = (int)pos;
(ipos < contents.length) && (nread < len);
ipos++) {
buf[i++] = contents[ipos];
nread++;
totalRead++;
LOG.info("ipos = " + ipos + ", contents.length = " + contents.length + ", nread = " + nread + ", len = " + len);
}
if ((nread == 0) && (ipos >= contents.length)) {
return -1;
}
return nread;
}
@Override
public synchronized int read(long pos, ByteBuffer buf) throws IOException {
if (pos > Integer.MAX_VALUE) {
return 0;
} else if (pos < 0) {
addError("Attempted to read from a location that was less " +
"than 0 at " + pos);
return 0;
}
int i = 0, nread = 0, ipos;
for (ipos = (int)pos;
ipos < contents.length; ipos++) {
try {
buf.put(contents[ipos]);
} catch (BufferOverflowException bos) {
break;
}
nread++;
totalRead++;
}
if ((nread == 0) && (ipos >= contents.length)) {
return -1;
}
return nread;
}
@Override
public synchronized void close() throws IOException {
numCloses++;
}
@Override
public boolean isLocal() {
return true;
}
@Override
public boolean isShortCircuit() {
return true;
}
@Override
public int getNetworkDistance() {
return 0;
}
synchronized String getError() {
return error;
}
long getGenerationStamp() {
return genstamp;
}
synchronized void addError(String text) {
LOG.error("SyntheticReplicaAccessor error: " + text);
error = error + prefix + text;
prefix = "; ";
}
}
@Test
public void testExternalBlockReader() throws Exception {
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
SyntheticReplicaAccessorBuilder.class.getName());
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
String uuid = UUID.randomUUID().toString();
conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.hosts(new String[] {NetUtils.getLocalHostname()})
.build();
final int TEST_LENGTH = 2047;
DistributedFileSystem dfs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
HdfsDataInputStream stream =
(HdfsDataInputStream)dfs.open(new Path("/a"));
byte buf[] = new byte[TEST_LENGTH];
stream.seek(1000);
IOUtils.readFully(stream, buf, 1000, TEST_LENGTH - 1000);
stream.seek(0);
IOUtils.readFully(stream, buf, 0, 1000);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
ReadStatistics stats = stream.getReadStatistics();
Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead());
Assert.assertEquals(2047, stats.getTotalLocalBytesRead());
Assert.assertEquals(2047, stats.getTotalBytesRead());
Assert.assertArrayEquals(expected, buf);
stream.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a"));
Assert.assertNotNull(block);
LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
Assert.assertNotNull(accessorList);
Assert.assertEquals(3, accessorList.size());
SyntheticReplicaAccessor accessor = accessorList.get(0);
Assert.assertTrue(accessor.builder.allowShortCircuit);
Assert.assertEquals(block.getBlockPoolId(),
accessor.builder.blockPoolId);
Assert.assertEquals(block.getBlockId(),
accessor.builder.blockId);
Assert.assertEquals(dfs.getClient().clientName,
accessor.builder.clientName);
Assert.assertEquals("/a", accessor.builder.fileName);
Assert.assertEquals(block.getGenerationStamp(),
accessor.getGenerationStamp());
Assert.assertTrue(accessor.builder.verifyChecksum);
Assert.assertEquals(1024L, accessor.builder.visibleLength);
Assert.assertEquals(24L, accessor.totalRead);
Assert.assertEquals("", accessor.getError());
Assert.assertEquals(1, accessor.numCloses);
byte[] tempBuf = new byte[5];
Assert.assertEquals(-1, accessor.read(TEST_LENGTH,
tempBuf, 0, 0));
Assert.assertEquals(-1, accessor.read(TEST_LENGTH,
tempBuf, 0, tempBuf.length));
accessors.remove(uuid);
} finally {
dfs.close();
cluster.shutdown();
}
}
}