blob: b111b1208aa534b1c0e48f315d9a2c1fcba84657 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.read;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.client.io.ECBlockInputStream;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.client.io.InsufficientLocationsException;
import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStreamFactory;
import org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.TestBlockInputStream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SplittableRandom;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import static org.apache.hadoop.ozone.client.rpc.read.ECStreamTestUtil.generateParity;
/**
* Test for the ECBlockReconstructedStripeInputStream.
*/
public class TestECBlockReconstructedStripeInputStream {
private static final int ONEMB = 1024 * 1024;
private ECReplicationConfig repConfig;
private ECStreamTestUtil.TestBlockInputStreamFactory streamFactory;
private long randomSeed;
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGen;
private ByteBufferPool bufferPool = new ElasticByteBufferPool();
private ExecutorService ecReconstructExecutor =
Executors.newFixedThreadPool(3);
@Before
public void setup() {
repConfig = new ECReplicationConfig(3, 2,
ECReplicationConfig.EcCodec.RS, ONEMB);
streamFactory = new ECStreamTestUtil.TestBlockInputStreamFactory();
randomSeed = random.nextLong();
dataGen = new SplittableRandom(randomSeed);
}
@After
public void teardown() {
ecReconstructExecutor.shutdownNow();
}
@Test
public void testSufficientLocations() throws IOException {
// One chunk, only 1 location.
BlockLocationInfo keyInfo = ECStreamTestUtil
.createKeyInfo(repConfig, 1, ONEMB);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// Two Chunks, but missing data block 2.
Map<DatanodeDetails, Integer> dnMap
= ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 2, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3.
dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
failed.add(keyInfo.getPipeline().getFirstNode());
((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
Assert.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, but missing data block 2 and 3 and parity 1.
dnMap = ECStreamTestUtil.createIndexMap(1, 4);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
// Three Chunks, all available but fail 3
dnMap = ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB * 3, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertTrue(ecb.hasSufficientLocations());
// Set a failed location
List<DatanodeDetails> failed = new ArrayList<>();
for (DatanodeDetails dn : dnMap.keySet()) {
failed.add(dn);
if (failed.size() == 3) {
break;
}
}
((ECBlockReconstructedStripeInputStream)ecb).addFailedDatanodes(failed);
Assert.assertFalse(ecb.hasSufficientLocations());
}
// One chunk, indexes 2 and 3 are padding, but still reported in the
// container list. The other locations are missing so we should have
// insufficient locations.
dnMap = ECStreamTestUtil.createIndexMap(2, 3);
keyInfo = ECStreamTestUtil.createKeyInfo(repConfig, ONEMB, dnMap);
try (ECBlockInputStream ecb = createInputStream(keyInfo)) {
Assert.assertFalse(ecb.hasSufficientLocations());
}
}
@Test
public void testReadFullStripesWithPartial() throws IOException {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * chunkSize);
ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
// Two data missing including first
locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
// No missing indexes
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
// Read 3 full stripes
for (int i = 0; i < 3; i++) {
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
}
Assert.assertEquals(stripeSize(), read);
// Check the underlying streams have read 1 chunk per read:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assert.assertEquals(chunkSize * (i + 1),
bis.getPos());
}
Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
clearBuffers(bufs);
}
// The next read is a partial stripe
int read = ecb.readStripe(bufs);
Assert.assertEquals(partialStripeSize, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
// A further read should give EOF
clearBuffers(bufs);
read = ecb.readStripe(bufs);
Assert.assertEquals(-1, read);
}
}
}
@Test
public void testReadPartialStripe() throws IOException {
int blockLength = repConfig.getEcChunkSize() - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
Assert.assertEquals(0, bufs[1].remaining());
Assert.assertEquals(0, bufs[1].position());
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
// Check the underlying streams have been advanced by 1 blockLength:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assert.assertEquals(blockLength, bis.getPos());
}
Assert.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
read = ecb.readStripe(bufs);
Assert.assertEquals(-1, read);
}
}
@Test
public void testReadPartialStripeTwoChunks() throws IOException {
int chunkSize = repConfig.getEcChunkSize();
int blockLength = chunkSize * 2 - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assert.assertEquals(chunkSize, bis.getPos());
}
Assert.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
read = ecb.readStripe(bufs);
Assert.assertEquals(-1, read);
}
}
@Test
public void testReadPartialStripeThreeChunks() throws IOException {
int chunkSize = repConfig.getEcChunkSize();
int blockLength = chunkSize * 3 - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
// We have a length that is less than a stripe, so chunks 1 and 2 are full.
// Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(3, 4, 5));
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing - the last one
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 5));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4));
// No indexes missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
int read = ecb.readStripe(bufs);
Assert.assertEquals(blockLength, read);
ECStreamTestUtil.assertBufferMatches(bufs[0], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[1], dataGen);
ECStreamTestUtil.assertBufferMatches(bufs[2], dataGen);
// Check the underlying streams have been advanced by 1 chunk:
for (TestBlockInputStream bis : streamFactory.getBlockStreams()) {
Assert.assertEquals(0, bis.getRemaining());
}
Assert.assertEquals(ecb.getPos(), blockLength);
clearBuffers(bufs);
// A further read should give EOF
read = ecb.readStripe(bufs);
Assert.assertEquals(-1, read);
}
}
}
@Test
public void testErrorThrownIfBlockNotLongEnough() throws IOException {
int blockLength = repConfig.getEcChunkSize() - 1;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 3 * ONEMB);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
addDataStreamsToFactory(dataBufs, parity);
// Set the parity buffer limit to be less than the block length
parity[0].limit(blockLength - 1);
parity[1].limit(blockLength - 1);
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
// We have a length that is less than a single chunk, so blocks 2 and 3
// are padding and will not be present. Block 1 is lost and needs recovered
// from the parity and padded blocks 2 and 3.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(4, 5);
BlockLocationInfo keyInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
try {
ecb.readStripe(bufs);
Assert.fail("Read should have thrown an exception");
} catch (InsufficientLocationsException e) {
// expected
}
}
}
@Test
public void testSeek() throws IOException {
// Generate the input data for 3 full stripes and generate the parity
// and a partial stripe
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
int dataLength = stripeSize() * 3 + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * chunkSize);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, dataLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
List<Map<DatanodeDetails, Integer>> locations = new ArrayList<>();
// Two data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 4, 5));
// One data missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 4, 5));
// Two data missing including first
locations.add(ECStreamTestUtil.createIndexMap(2, 4, 5));
// One data and one parity missing
locations.add(ECStreamTestUtil.createIndexMap(2, 3, 4));
// No locations missing
locations.add(ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5));
for (Map<DatanodeDetails, Integer> dnMap : locations) {
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
// Read Stripe 1
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 0, chunkSize);
}
Assert.assertEquals(stripeSize(), read);
Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
// Seek to 0 and read again
clearBuffers(bufs);
ecb.seek(0);
ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 0, chunkSize);
}
Assert.assertEquals(stripeSize(), read);
Assert.assertEquals(dataLength - stripeSize(), ecb.getRemaining());
// Seek to the last stripe
// Seek to the last stripe
clearBuffers(bufs);
ecb.seek(stripeSize() * 3);
read = ecb.readStripe(bufs);
validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(partialStripeSize, read);
Assert.assertEquals(0, ecb.getRemaining());
// seek to the start of stripe 3
clearBuffers(bufs);
ecb.seek(stripeSize() * (long)2);
read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], 2 * chunkSize, chunkSize);
}
Assert.assertEquals(stripeSize(), read);
Assert.assertEquals(partialStripeSize, ecb.getRemaining());
}
}
}
@Test
public void testSeekToPartialOffsetFails() {
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 4, 5);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
try {
ecb.seek(10);
Assert.fail("Seek should have thrown an exception");
} catch (IOException e) {
Assert.assertEquals("Requested position 10 does not align " +
"with a stripe offset", e.getMessage());
}
}
}
private Integer getRandomStreamIndex(Set<Integer> set) {
return set.stream().skip(new Random().nextInt(set.size()))
.findFirst().orElse(null);
}
@Test
public void testErrorReadingBlockContinuesReading() throws IOException {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
int blockLength = repConfig.getEcChunkSize() * repConfig.getData() * 3
+ partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(),
4 * chunkSize);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
for (int k = 0; k < 5; k++) {
Set<Integer> failed = new HashSet<>();
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
// Data block index 3 is missing and needs recovered initially.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 4, 5);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
// After reading the first stripe, make one of the streams error
for (int i = 0; i < 3; i++) {
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
validateContents(dataBufs[j], bufs[j], i * chunkSize, chunkSize);
}
Assert.assertEquals(stripeSize() * (i + 1), ecb.getPos());
Assert.assertEquals(stripeSize(), read);
clearBuffers(bufs);
if (i == 0) {
Integer failStream =
getRandomStreamIndex(streamFactory.getStreamIndexes());
streamFactory.getBlockStream(failStream)
.setShouldError(true);
failed.add(failStream);
}
}
// The next read is a partial stripe
int read = ecb.readStripe(bufs);
Assert.assertEquals(partialStripeSize, read);
validateContents(dataBufs[0], bufs[0], 3 * chunkSize, chunkSize);
validateContents(dataBufs[1], bufs[1], 3 * chunkSize, chunkSize - 1);
Assert.assertEquals(0, bufs[2].remaining());
Assert.assertEquals(0, bufs[2].position());
// seek back to zero and read a stripe to re-open the streams
ecb.seek(0);
clearBuffers(bufs);
ecb.readStripe(bufs);
// Now fail another random stream and the read should fail with
// insufficient locations
Set<Integer> currentStreams =
new HashSet<>(streamFactory.getStreamIndexes());
currentStreams.removeAll(failed);
Integer failStream = getRandomStreamIndex(currentStreams);
streamFactory.getBlockStream(failStream)
.setShouldError(true);
try {
clearBuffers(bufs);
ecb.readStripe(bufs);
Assert.fail("InsufficientLocationsException expected");
} catch (InsufficientLocationsException e) {
// expected
}
}
}
}
@Test(expected = InsufficientLocationsException.class)
public void testAllLocationsFailOnFirstRead() throws IOException {
// This test simulates stale nodes. When the nodes are stale, but not yet
// dead, the locations will still be given to the client and it will try to
// read them, but the read will always fail.
// Additionally, if the key is small (less than 2 EC chunks), the locations
// for the indexes which are all padding will be returned to the client and
// this can confuse the "sufficient locations" check, resulting in a strange
// error when selecting parity indexes (HDDS-6258)
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize;
int blockLength = partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), chunkSize);
ECStreamTestUtil
.randomFill(dataBufs, repConfig.getEcChunkSize(), dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
// Fail all the indexes containing data on their first read.
streamFactory.setFailIndexes(indexesToList(1, 4, 5));
// The locations contain the padded indexes, as will often be the case
// when containers are reported by SCM.
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
blockLength, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
ecb.readStripe(bufs);
}
}
@Test
public void testFailedLocationsAreNotRead() throws IOException {
// Generate the input data for 3 full stripes and generate the parity.
int chunkSize = repConfig.getEcChunkSize();
int partialStripeSize = chunkSize * 2 - 1;
int blockLength = chunkSize * repConfig.getData() * 3 + partialStripeSize;
ByteBuffer[] dataBufs = allocateBuffers(repConfig.getData(), 4 * chunkSize);
ECStreamTestUtil.randomFill(dataBufs, chunkSize, dataGen, blockLength);
ByteBuffer[] parity = generateParity(dataBufs, repConfig);
streamFactory = new TestBlockInputStreamFactory();
addDataStreamsToFactory(dataBufs, parity);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo keyInfo = ECStreamTestUtil.createKeyInfo(repConfig,
stripeSize() * 3 + partialStripeSize, dnMap);
streamFactory.setCurrentPipeline(keyInfo.getPipeline());
ByteBuffer[] bufs = allocateByteBuffers(repConfig);
dataGen = new SplittableRandom(randomSeed);
try (ECBlockReconstructedStripeInputStream ecb =
createInputStream(keyInfo)) {
List<DatanodeDetails> failed = new ArrayList<>();
// Set the first 3 DNs as failed
for (Map.Entry<DatanodeDetails, Integer> e : dnMap.entrySet()) {
if (e.getValue() <= 2) {
failed.add(e.getKey());
}
}
ecb.addFailedDatanodes(failed);
// Read full stripe
int read = ecb.readStripe(bufs);
for (int j = 0; j < bufs.length; j++) {
ECStreamTestUtil.assertBufferMatches(bufs[j], dataGen);
}
Assert.assertEquals(stripeSize(), read);
// Now ensure that streams with repIndexes 1, 2 and 3 have not been
// created in the stream factory, indicating we did not read them.
List<TestBlockInputStream> streams = streamFactory.getBlockStreams();
for (TestBlockInputStream stream : streams) {
Assert.assertTrue(stream.getEcReplicaIndex() > 2);
}
}
}
private ECBlockReconstructedStripeInputStream createInputStream(
BlockLocationInfo keyInfo) {
return new ECBlockReconstructedStripeInputStream(repConfig, keyInfo, true,
null, null, streamFactory, bufferPool, ecReconstructExecutor);
}
private List<Integer> indexesToList(int... indexes) {
List<Integer> list = new ArrayList<>();
for (int i : indexes) {
list.add(i);
}
return list;
}
private void addDataStreamsToFactory(ByteBuffer[] data, ByteBuffer[] parity) {
List<ByteBuffer> dataStreams = new ArrayList<>();
for (ByteBuffer b : data) {
dataStreams.add(b);
}
for (ByteBuffer b : parity) {
dataStreams.add(b);
}
streamFactory.setBlockStreamData(dataStreams);
}
/**
* Validates that the data buffer has the same contents as the source buffer,
* starting the checks in the src at offset and for count bytes.
* @param src The source of the data
* @param data The data which should be checked against the source
* @param offset The starting point in the src buffer
* @param count How many bytes to check.
*/
private void validateContents(ByteBuffer src, ByteBuffer data, int offset,
int count) {
byte[] srcArray = src.array();
Assert.assertEquals(count, data.remaining());
for (int i = offset; i < offset + count; i++) {
Assert.assertEquals("Element " + i, srcArray[i], data.get());
}
data.flip();
}
/**
* Return a list of num ByteBuffers of the given size.
* @param num Number of buffers to create
* @param size The size of each buffer
* @return
*/
private ByteBuffer[] allocateBuffers(int num, int size) {
ByteBuffer[] bufs = new ByteBuffer[num];
for (int i = 0; i < num; i++) {
bufs[i] = ByteBuffer.allocate(size);
}
return bufs;
}
private int stripeSize() {
return stripeSize(repConfig);
}
private int stripeSize(ECReplicationConfig rconfig) {
return rconfig.getEcChunkSize() * rconfig.getData();
}
private void clearBuffers(ByteBuffer[] bufs) {
for (ByteBuffer b : bufs) {
b.clear();
}
}
private ByteBuffer[] allocateByteBuffers(ECReplicationConfig rConfig) {
ByteBuffer[] bufs = new ByteBuffer[repConfig.getData()];
for (int i = 0; i < bufs.length; i++) {
bufs[i] = ByteBuffer.allocate(rConfig.getEcChunkSize());
}
return bufs;
}
}