blob: ef0a119f9c63e4b73557cce03ce0b0ee745c4dd6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathHandle;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.FinalizedProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.TestProvidedReplicaImpl;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Basic test cases for provided implementation.
*/
public class TestProvidedImpl {
private static final Logger LOG =
LoggerFactory.getLogger(TestFsDatasetImpl.class);
private static final String BASE_DIR =
new FileSystemTestHelper().getTestRootDir();
private static final int NUM_LOCAL_INIT_VOLUMES = 1;
// only support one provided volume for now.
private static final int NUM_PROVIDED_INIT_VOLUMES = 1;
private static final String[] BLOCK_POOL_IDS = {"bpid-0", "bpid-1"};
private static final int NUM_PROVIDED_BLKS = 10;
private static final long BLK_LEN = 128 * 1024;
private static final int MIN_BLK_ID = 0;
private static final int CHOSEN_BP_ID = 0;
private static String providedBasePath = BASE_DIR;
private Configuration conf;
private DataNode datanode;
private DataStorage storage;
private FsDatasetImpl dataset;
private static Map<Long, String> blkToPathMap;
private static List<FsVolumeImpl> providedVolumes;
private static long spaceUsed = 0;
/**
* A simple FileRegion iterator for tests.
*/
public static class TestFileRegionIterator implements Iterator<FileRegion> {
private int numBlocks;
private int currentCount;
private String basePath;
public TestFileRegionIterator(String basePath, int minID, int numBlocks) {
this.currentCount = minID;
this.numBlocks = numBlocks;
this.basePath = basePath;
}
@Override
public boolean hasNext() {
return currentCount < numBlocks;
}
@Override
public FileRegion next() {
FileRegion region = null;
if (hasNext()) {
File newFile = new File(basePath, "file" + currentCount);
if(!newFile.exists()) {
try {
LOG.info("Creating file for blkid " + currentCount);
blkToPathMap.put((long) currentCount, newFile.getAbsolutePath());
LOG.info("Block id " + currentCount + " corresponds to file " +
newFile.getAbsolutePath());
newFile.createNewFile();
Writer writer = new OutputStreamWriter(
new FileOutputStream(newFile.getAbsolutePath()), "utf-8");
for(int i=0; i< BLK_LEN/(Integer.SIZE/8); i++) {
writer.write(currentCount);
}
writer.flush();
writer.close();
spaceUsed += BLK_LEN;
} catch (IOException e) {
e.printStackTrace();
}
}
region = new FileRegion(currentCount, new Path(newFile.toString()),
0, BLK_LEN);
currentCount++;
}
return region;
}
@Override
public void remove() {
// do nothing.
}
public void resetMinBlockId(int minId) {
currentCount = minId;
}
public void resetBlockCount(int numBlocks) {
this.numBlocks = numBlocks;
}
}
/**
* A simple FileRegion BlockAliasMap for tests.
*/
public static class TestFileRegionBlockAliasMap
extends BlockAliasMap<FileRegion> {
private Configuration conf;
private int minId;
private int numBlocks;
private Iterator<FileRegion> suppliedIterator;
TestFileRegionBlockAliasMap() {
this(null, MIN_BLK_ID, NUM_PROVIDED_BLKS);
}
TestFileRegionBlockAliasMap(Iterator<FileRegion> iterator, int minId,
int numBlocks) {
this.suppliedIterator = iterator;
this.minId = minId;
this.numBlocks = numBlocks;
}
@Override
public Reader<FileRegion> getReader(Reader.Options opts, String blockPoolId)
throws IOException {
if (!blockPoolId.equals(BLOCK_POOL_IDS[CHOSEN_BP_ID])) {
return null;
}
BlockAliasMap.Reader<FileRegion> reader =
new BlockAliasMap.Reader<FileRegion>() {
@Override
public Iterator<FileRegion> iterator() {
if (suppliedIterator == null) {
return new TestFileRegionIterator(providedBasePath, minId,
numBlocks);
} else {
return suppliedIterator;
}
}
@Override
public void close() throws IOException {
}
@Override
public Optional<FileRegion> resolve(Block ident)
throws IOException {
return null;
}
};
return reader;
}
@Override
public Writer<FileRegion> getWriter(Writer.Options opts, String blockPoolId)
throws IOException {
// not implemented
return null;
}
@Override
public void refresh() throws IOException {
// do nothing!
}
@Override
public void close() throws IOException {
// do nothing
}
}
private static Storage.StorageDirectory createLocalStorageDirectory(
File root, Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd =
new Storage.StorageDirectory(
StorageLocation.parse(root.toURI().toString()));
DataStorage.createStorageID(sd, false, conf);
return sd;
}
private static Storage.StorageDirectory createProvidedStorageDirectory(
String confString, Configuration conf)
throws SecurityException, IOException {
Storage.StorageDirectory sd =
new Storage.StorageDirectory(StorageLocation.parse(confString));
DataStorage.createStorageID(sd, false, conf);
return sd;
}
private static void createStorageDirs(DataStorage storage,
Configuration conf, int numDirs, int numProvidedDirs)
throws IOException {
List<Storage.StorageDirectory> dirs =
new ArrayList<Storage.StorageDirectory>();
List<String> dirStrings = new ArrayList<String>();
FileUtils.deleteDirectory(new File(BASE_DIR));
for (int i = 0; i < numDirs; i++) {
File loc = new File(BASE_DIR, "data" + i);
dirStrings.add(new Path(loc.toString()).toUri().toString());
loc.mkdirs();
dirs.add(createLocalStorageDirectory(loc, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
for (int i = numDirs; i < numDirs + numProvidedDirs; i++) {
File loc = new File(BASE_DIR, "data" + i);
providedBasePath = loc.getAbsolutePath();
loc.mkdirs();
String dirString = "[PROVIDED]" +
new Path(loc.toString()).toUri().toString();
dirStrings.add(dirString);
dirs.add(createProvidedStorageDirectory(dirString, conf));
when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
}
String dataDir = StringUtils.join(",", dirStrings);
conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
when(storage.dirIterator()).thenReturn(dirs.iterator());
when(storage.getNumStorageDirs()).thenReturn(numDirs + numProvidedDirs);
}
private int getNumVolumes() {
try (FsDatasetSpi.FsVolumeReferences volumes =
dataset.getFsVolumeReferences()) {
return volumes.size();
} catch (IOException e) {
return 0;
}
}
@Before
public void setUp() throws IOException {
datanode = mock(DataNode.class);
storage = mock(DataStorage.class);
conf = new Configuration();
conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
when(datanode.getConf()).thenReturn(conf);
final DNConf dnConf = new DNConf(datanode);
when(datanode.getDnConf()).thenReturn(dnConf);
// reset the space used
spaceUsed = 0;
final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
final ShortCircuitRegistry shortCircuitRegistry =
new ShortCircuitRegistry(conf);
when(datanode.getShortCircuitRegistry()).thenReturn(shortCircuitRegistry);
this.conf.setClass(DFSConfigKeys.DFS_PROVIDED_ALIASMAP_CLASS,
TestFileRegionBlockAliasMap.class, BlockAliasMap.class);
blkToPathMap = new HashMap<Long, String>();
providedVolumes = new LinkedList<FsVolumeImpl>();
createStorageDirs(
storage, conf, NUM_LOCAL_INIT_VOLUMES, NUM_PROVIDED_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
FsVolumeReferences volumes = dataset.getFsVolumeReferences();
for (int i = 0; i < volumes.size(); i++) {
FsVolumeSpi vol = volumes.get(i);
if (vol.getStorageType() == StorageType.PROVIDED) {
providedVolumes.add((FsVolumeImpl) vol);
}
}
for (String bpid : BLOCK_POOL_IDS) {
dataset.addBlockPool(bpid, conf);
}
}
@Test
public void testReserved() throws Exception {
for (FsVolumeSpi vol : providedVolumes) {
// the reserved space for provided volumes should be 0.
assertEquals(0, ((FsVolumeImpl) vol).getReserved());
}
}
@Test
public void testProvidedVolumeImpl() throws IOException {
assertEquals(NUM_LOCAL_INIT_VOLUMES + NUM_PROVIDED_INIT_VOLUMES,
getNumVolumes());
assertEquals(NUM_PROVIDED_INIT_VOLUMES, providedVolumes.size());
assertEquals(0, dataset.getNumFailedVolumes());
for (int i = 0; i < providedVolumes.size(); i++) {
// check basic information about provided volume
assertEquals(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT,
providedVolumes.get(i).getStorageID());
assertEquals(StorageType.PROVIDED,
providedVolumes.get(i).getStorageType());
long space = providedVolumes.get(i).getBlockPoolUsed(
BLOCK_POOL_IDS[CHOSEN_BP_ID]);
// check the df stats of the volume
assertEquals(spaceUsed, space);
assertEquals(NUM_PROVIDED_BLKS, providedVolumes.get(i).getNumBlocks());
providedVolumes.get(i).shutdownBlockPool(
BLOCK_POOL_IDS[1 - CHOSEN_BP_ID], null);
try {
assertEquals(0, providedVolumes.get(i)
.getBlockPoolUsed(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID]));
// should not be triggered
assertTrue(false);
} catch (IOException e) {
LOG.info("Expected exception: " + e);
}
}
}
@Test
public void testBlockLoad() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
assertEquals(vol.getBlockPoolList().length, BLOCK_POOL_IDS.length);
for (int j = 0; j < BLOCK_POOL_IDS.length; j++) {
if (j != CHOSEN_BP_ID) {
// this block pool should not have any blocks
assertEquals(null, volumeMap.replicas(BLOCK_POOL_IDS[j]));
}
}
assertEquals(NUM_PROVIDED_BLKS,
volumeMap.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID]).size());
}
}
@Test
public void testProvidedBlockRead() throws IOException {
for (int id = 0; id < NUM_PROVIDED_BLKS; id++) {
ExtendedBlock eb = new ExtendedBlock(
BLOCK_POOL_IDS[CHOSEN_BP_ID], id, BLK_LEN,
HdfsConstants.GRANDFATHER_GENERATION_STAMP);
InputStream ins = dataset.getBlockInputStream(eb, 0);
String filepath = blkToPathMap.get((long) id);
TestProvidedReplicaImpl.verifyReplicaContents(new File(filepath), ins, 0,
BLK_LEN);
}
}
@Test
public void testProvidedBlockIterator() throws IOException {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
BlockIterator iter =
vol.newBlockIterator(BLOCK_POOL_IDS[CHOSEN_BP_ID], "temp");
Set<Long> blockIdsUsed = new HashSet<Long>();
assertEquals(BLOCK_POOL_IDS[CHOSEN_BP_ID], iter.getBlockPoolId());
while(!iter.atEnd()) {
ExtendedBlock eb = iter.nextBlock();
long blkId = eb.getBlockId();
assertTrue(blkId >= MIN_BLK_ID && blkId < NUM_PROVIDED_BLKS);
// all block ids must be unique!
assertTrue(!blockIdsUsed.contains(blkId));
blockIdsUsed.add(blkId);
}
assertEquals(NUM_PROVIDED_BLKS, blockIdsUsed.size());
// rewind the block iterator
iter.rewind();
while(!iter.atEnd()) {
ExtendedBlock eb = iter.nextBlock();
long blkId = eb.getBlockId();
// the block should have already appeared in the first scan.
assertTrue(blockIdsUsed.contains(blkId));
blockIdsUsed.remove(blkId);
}
// none of the blocks should remain in blockIdsUsed
assertEquals(0, blockIdsUsed.size());
// the other block pool should not contain any blocks!
BlockIterator nonProvidedBpIter =
vol.newBlockIterator(BLOCK_POOL_IDS[1 - CHOSEN_BP_ID], "temp");
assertEquals(null, nonProvidedBpIter.nextBlock());
}
}
private int getBlocksInProvidedVolumes(String basePath, int numBlocks,
int minBlockId) throws IOException {
TestFileRegionIterator fileRegionIterator =
new TestFileRegionIterator(basePath, minBlockId, numBlocks);
int totalBlocks = 0;
for (int i = 0; i < providedVolumes.size(); i++) {
ProvidedVolumeImpl vol = (ProvidedVolumeImpl) providedVolumes.get(i);
vol.setFileRegionProvider(BLOCK_POOL_IDS[CHOSEN_BP_ID],
new TestFileRegionBlockAliasMap(fileRegionIterator, minBlockId,
numBlocks));
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(BLOCK_POOL_IDS[CHOSEN_BP_ID], volumeMap, null);
totalBlocks += volumeMap.size(BLOCK_POOL_IDS[CHOSEN_BP_ID]);
}
return totalBlocks;
}
/**
* Tests if the FileRegions provided by the FileRegionProvider
* can belong to the Providevolume.
* @throws IOException
*/
@Test
public void testProvidedVolumeContents() throws IOException {
int expectedBlocks = 5;
int minId = 0;
// use a path which has the same prefix as providedBasePath
// all these blocks can belong to the provided volume
int blocksFound = getBlocksInProvidedVolumes(providedBasePath + "/test1/",
expectedBlocks, minId);
assertEquals(
"Number of blocks in provided volumes should be " + expectedBlocks,
expectedBlocks, blocksFound);
blocksFound = getBlocksInProvidedVolumes(
"file:/" + providedBasePath + "/test1/", expectedBlocks, minId);
assertEquals(
"Number of blocks in provided volumes should be " + expectedBlocks,
expectedBlocks, blocksFound);
// use a path that is entirely different from the providedBasePath
// none of these blocks can belong to the volume
blocksFound =
getBlocksInProvidedVolumes("randomtest1/", expectedBlocks, minId);
assertEquals("Number of blocks in provided volumes should be 0", 0,
blocksFound);
}
@Test
public void testProvidedVolumeContainsBlock() throws URISyntaxException {
assertEquals(true, ProvidedVolumeImpl.containsBlock(null, null));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("file:/a"), null));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/c/"),
new URI("file:/a/b/c/d/e.file")));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c/"),
new URI("file:/a/b/c/d/e.file")));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c"),
new URI("file:/a/b/c/d/e.file")));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("/a/b/c/"),
new URI("/a/b/c/d/e.file")));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/c/"),
new URI("/a/b/c/d/e.file")));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("/a/b/e"),
new URI("file:/a/b/c/d/e.file")));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("file:/a/b/e"),
new URI("file:/a/b/c/d/e.file")));
assertEquals(true,
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket1/dir1/"),
new URI("s3a:/bucket1/dir1/temp.txt")));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket2/dir1/"),
new URI("s3a:/bucket1/dir1/temp.txt")));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("s3a:/bucket1/dir1/"),
new URI("s3a:/bucket1/temp.txt")));
assertEquals(false,
ProvidedVolumeImpl.containsBlock(new URI("/bucket1/dir1/"),
new URI("s3a:/bucket1/dir1/temp.txt")));
}
@Test
public void testProvidedReplicaSuffixExtraction() {
assertEquals("B.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B.txt")));
assertEquals("B/C.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B/C.txt")));
assertEquals("B/C/D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/"), new Path("file:///A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///A/B/C/"), new Path("file:///A/B/C/D.txt")));
assertEquals("file:/A/B/C/D.txt", ProvidedVolumeImpl.getSuffix(
new Path("file:///X/B/C/"), new Path("file:///A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("/A/B/C"), new Path("/A/B/C/D.txt")));
assertEquals("D.txt", ProvidedVolumeImpl.getSuffix(
new Path("/A/B/C/"), new Path("/A/B/C/D.txt")));
assertEquals("data/current.csv", ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/alice/"),
new Path("wasb:///users/alice/data/current.csv")));
assertEquals("current.csv", ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/alice/data"),
new Path("wasb:///users/alice/data/current.csv")));
assertEquals("wasb:/users/alice/data/current.csv",
ProvidedVolumeImpl.getSuffix(
new Path("wasb:///users/bob/"),
new Path("wasb:///users/alice/data/current.csv")));
}
@Test
public void testProvidedReplicaPrefix() throws Exception {
for (int i = 0; i < providedVolumes.size(); i++) {
FsVolumeImpl vol = providedVolumes.get(i);
ReplicaMap volumeMap = new ReplicaMap(new ReentrantReadWriteLock());
vol.getVolumeMap(volumeMap, null);
Path expectedPrefix = new Path(
StorageLocation.normalizeFileURI(new File(providedBasePath).toURI()));
for (ReplicaInfo info : volumeMap
.replicas(BLOCK_POOL_IDS[CHOSEN_BP_ID])) {
ProvidedReplica pInfo = (ProvidedReplica) info;
assertEquals(expectedPrefix, pInfo.getPathPrefix());
}
}
}
@Test
public void testScannerWithProvidedVolumes() throws Exception {
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport();
// no blocks should be reported for the Provided volume as long as
// the directoryScanner is disabled.
assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
}
/**
* Tests that a ProvidedReplica supports path handles.
*
* @throws Exception
*/
@Test
public void testProvidedReplicaWithPathHandle() throws Exception {
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
// generate random data
int chunkSize = 512;
Random r = new Random(12345L);
byte[] data = new byte[chunkSize];
r.nextBytes(data);
Path file = new Path("/testfile");
try (FSDataOutputStream fout = fs.create(file)) {
fout.write(data);
}
PathHandle pathHandle = fs.getPathHandle(fs.getFileStatus(file),
Options.HandleOpt.changed(true), Options.HandleOpt.moved(true));
FinalizedProvidedReplica replica = new FinalizedProvidedReplica(0,
file.toUri(), 0, chunkSize, 0, pathHandle, null, conf, fs);
byte[] content = new byte[chunkSize];
IOUtils.readFully(replica.getDataInputStream(0), content, 0, chunkSize);
assertArrayEquals(data, content);
fs.rename(file, new Path("/testfile.1"));
// read should continue succeeding after the rename operation
IOUtils.readFully(replica.getDataInputStream(0), content, 0, chunkSize);
assertArrayEquals(data, content);
replica.setPathHandle(null);
try {
// expected to fail as URI of the provided replica is no longer valid.
replica.getDataInputStream(0);
fail("Expected an exception");
} catch (IOException e) {
LOG.info("Expected exception " + e);
}
}
}