blob: 5d8c479f2f46da188a228b4b52731c8ada2a674a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestLocatedBlocksRefresher {
private static final Logger LOG = LoggerFactory.getLogger(TestLocatedBlocksRefresher.class);
private static final int BLOCK_SIZE = 1024 * 1024;
private static final short REPLICATION_FACTOR = (short) 4;
private static final String[] RACKS = new String[] {
"/d1/r1", "/d1/r1", "/d1/r2", "/d1/r2", "/d1/r2", "/d2/r3", "/d2/r3" };
private static final int NUM_DATA_NODES = RACKS.length;
private final int numOfBlocks = 24;
private final int fileLength = numOfBlocks * BLOCK_SIZE;
private final int dfsClientPrefetchSize = fileLength / 2;
private MiniDFSCluster cluster;
private Configuration conf;
@Before
public void setUp() throws Exception {
cluster = null;
conf = new HdfsConfiguration();
// disable shortcircuit reading
conf.setBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, false);
// set replication factor
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, REPLICATION_FACTOR);
// set block size and other sizes
conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
conf.setLong(HdfsClientConfigKeys.Read.PREFETCH_SIZE_KEY,
dfsClientPrefetchSize);
}
@After
public void tearDown() throws Exception {
if (cluster != null) {
cluster.shutdown(true, true);
}
}
private void setupTest(long refreshInterval) throws IOException {
conf.setLong(DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_KEY, refreshInterval);
// this is necessary to ensure no caching between runs
conf.set("dfs.client.context", UUID.randomUUID().toString());
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).racks(RACKS).build();
cluster.waitActive();
}
@Test
public void testDisabledOnZeroInterval() throws IOException {
setupTest(0);
assertNull(cluster.getFileSystem().getClient().getLocatedBlockRefresher());
}
@Test
public void testEnabledOnNonZeroInterval() throws Exception {
setupTest(1000);
LocatedBlocksRefresher refresher =
cluster.getFileSystem().getClient().getLocatedBlockRefresher();
assertNotNull(refresher);
assertNoMoreRefreshes(refresher);
}
@Test
public void testRefreshOnDeadNodes() throws Exception {
setupTest(1000);
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.getClient();
LocatedBlocksRefresher refresher = client.getLocatedBlockRefresher();
String fileName = createTestFile(fs);
try (DFSInputStream fin = client.open(fileName)) {
LocatedBlocks locatedBlocks = fin.locatedBlocks;
assertEquals(dfsClientPrefetchSize / BLOCK_SIZE,
locatedBlocks.locatedBlockCount());
// should not be tracked yet
assertFalse(refresher.isInputStreamTracked(fin));
// track and verify
refresher.addInputStream(fin);
assertTrue(refresher.isInputStreamTracked(fin));
// no refreshes yet, as nothing has happened
assertNoMoreRefreshes(refresher);
synchronized (fin.infoLock) {
assertSame(locatedBlocks, fin.locatedBlocks);
}
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 1);
// read blocks, which should trigger dead node for the one we stopped
int chunkReadSize = BLOCK_SIZE / 4;
byte[] readBuffer = new byte[chunkReadSize];
fin.read(0, readBuffer, 0, readBuffer.length);
assertEquals(1, fin.getLocalDeadNodes().size());
// we should get a refresh now
assertRefreshes(refresher, 1);
// verify that it actually changed things
synchronized (fin.infoLock) {
assertNotSame(locatedBlocks, fin.locatedBlocks);
assertTrue(fin.getLocalDeadNodes().isEmpty());
}
// no more refreshes because everything is happy again
assertNoMoreRefreshes(refresher);
// stop another node, and try to trigger a new deadNode
stopNodeHostingBlocks(fin, NUM_DATA_NODES - 2);
readBuffer = new byte[chunkReadSize];
fin.read(0, readBuffer, 0, readBuffer.length);
// we should refresh again now, and verify
// may actually be more than 1, since the first dead node
// may still be listed in the replicas for the bock
assertTrue(fin.getLocalDeadNodes().size() > 0);
assertRefreshes(refresher, 1);
synchronized (fin.infoLock) {
assertNotSame(locatedBlocks, fin.locatedBlocks);
assertTrue(fin.getLocalDeadNodes().isEmpty());
}
// de-register, and expect no more refreshes below
refresher.removeInputStream(fin);
}
assertNoMoreRefreshes(refresher);
}
private void stopNodeHostingBlocks(DFSInputStream fin, int expectedNodes) {
synchronized (fin.infoLock) {
int idx = fin.locatedBlocks.findBlock(0);
for (int i = 0; i < REPLICATION_FACTOR; i++) {
String deadNodeAddr = fin.locatedBlocks.get(idx).getLocations()[i].getXferAddr();
DataNodeProperties dataNodeProperties = cluster.stopDataNode(deadNodeAddr);
if (dataNodeProperties != null) {
List<DataNode> datanodesPostStoppage = cluster.getDataNodes();
assertEquals(expectedNodes, datanodesPostStoppage.size());
return;
}
}
throw new RuntimeException("Could not find a datanode to stop");
}
}
private void assertNoMoreRefreshes(LocatedBlocksRefresher refresher) throws InterruptedException {
long interval = refresher.getInterval();
int runCount = refresher.getRunCount();
int refreshCount = refresher.getRefreshCount();
LOG.info("Waiting for at least {} runs, from current {}, expecting no refreshes",
runCount + 3, runCount);
// wait for it to run 3 times, with some buffer
awaitWithTimeout(() -> refresher.getRunCount() > runCount + 3, 5 * interval);
// it should not have refreshed anything, because no DFSInputStreams registered anymore
assertEquals(refreshCount, refresher.getRefreshCount());
}
private void assertRefreshes(LocatedBlocksRefresher refresher, int expectedRefreshes)
throws InterruptedException {
int runCount = refresher.getRunCount();
int refreshCount = refresher.getRefreshCount();
int expectedRuns = 3;
if (expectedRefreshes < 0) {
expectedRefreshes = expectedRuns;
}
LOG.info(
"Waiting for at least {} runs, from current {}. Expecting {} refreshes, from current {}",
runCount + expectedRuns, runCount, refreshCount + expectedRefreshes, refreshCount
);
// wait for it to run 3 times
awaitWithTimeout(() -> refresher.getRunCount() >= runCount + expectedRuns, 10_000);
// the values may not be identical due to any refreshes that occurred before we opened
// the DFSInputStream but the difference should be identical since we are refreshing
// every time
assertEquals(expectedRefreshes, refresher.getRefreshCount() - refreshCount);
}
private void awaitWithTimeout(Supplier<Boolean> test, long timeoutMillis)
throws InterruptedException {
long now = Time.monotonicNow();
while(!test.get()) {
if (Time.monotonicNow() - now > timeoutMillis) {
fail("Timed out waiting for true condition");
return;
}
Thread.sleep(50);
}
}
private String createTestFile(FileSystem fs) throws IOException {
String fileName = "/located_blocks_" + UUID.randomUUID().toString();
Path filePath = new Path(fileName);
try (FSDataOutputStream fout = fs.create(filePath, REPLICATION_FACTOR)) {
fout.write(new byte[(fileLength)]);
}
fs.deleteOnExit(filePath);
return fileName;
}
}