blob: a45ab43c841e0e9d60e69994e960f2b35502a14e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc.read;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.ozone.client.io.BadDataLocationException;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SplittableRandom;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
/**
* Unit tests for the ECBlockInputStreamProxy class.
*/
public class TestECBlockInputStreamProxy {
private static final Logger LOG =
LoggerFactory.getLogger(TestECBlockInputStreamProxy.class);
private static final int ONEMB = 1024 * 1024;
private ECReplicationConfig repConfig;
private TestECBlockInputStreamFactory streamFactory;
private long randomSeed;
private ThreadLocalRandom random = ThreadLocalRandom.current();
private SplittableRandom dataGenerator;
@Before
public void setup() {
repConfig = new ECReplicationConfig(3, 2);
streamFactory = new TestECBlockInputStreamFactory();
randomSeed = random.nextLong();
dataGenerator = new SplittableRandom(randomSeed);
}
@Test
public void testExpectedDataLocations() {
Assert.assertEquals(1,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
Assert.assertEquals(2,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
Assert.assertEquals(3,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
Assert.assertEquals(3,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
repConfig = new ECReplicationConfig(6, 3);
Assert.assertEquals(1,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 1));
Assert.assertEquals(2,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, ONEMB + 1));
Assert.assertEquals(3,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 3 * ONEMB));
Assert.assertEquals(6,
ECBlockInputStreamProxy.expectedDataLocations(repConfig, 10 * ONEMB));
}
@Test
public void testAvailableDataLocations() {
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 1));
Assert.assertEquals(2, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 2));
Assert.assertEquals(3, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 3));
dnMap = ECStreamTestUtil.createIndexMap(1, 4, 5);
blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
Assert.assertEquals(1, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 3));
dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, 1024, dnMap);
Assert.assertEquals(0, ECBlockInputStreamProxy.availableDataLocations(
blockInfo.getPipeline(), 1));
}
@Test
public void testBlockIDCanBeRetrieved() throws IOException {
int blockLength = 1234;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
Assert.assertEquals(blockInfo.getBlockID(), bis.getBlockID());
}
}
@Test
public void testBlockLengthCanBeRetrieved() throws IOException {
int blockLength = 1234;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
Assert.assertEquals(1234, bis.getLength());
}
}
@Test
public void testBlockRemainingCanBeRetrieved() throws IOException {
int blockLength = 12345;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
dataGenerator = new SplittableRandom(randomSeed);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
Assert.assertEquals(12345, bis.getRemaining());
Assert.assertEquals(0, bis.getPos());
bis.read(readBuffer);
Assert.assertEquals(12345 - 100, bis.getRemaining());
Assert.assertEquals(100, bis.getPos());
}
}
@Test
public void testCorrectStreamCreatedDependingOnDataLocations()
throws IOException {
int blockLength = 5 * ONEMB;
ByteBuffer data = generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
// Not all locations present, so we expect on;y the "missing=true" stream
// to be present.
Assert.assertTrue(streamFactory.getStreams().containsKey(false));
Assert.assertFalse(streamFactory.getStreams().containsKey(true));
}
streamFactory = new TestECBlockInputStreamFactory();
streamFactory.setData(data);
dnMap = ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
blockInfo = ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
// Not all locations present, so we expect on;y the "missing=true" stream
// to be present.
Assert.assertFalse(streamFactory.getStreams().containsKey(false));
Assert.assertTrue(streamFactory.getStreams().containsKey(true));
}
}
@Test
public void testCanReadNonReconstructionToEOF()
throws IOException {
int blockLength = 5 * ONEMB;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
dataGenerator = new SplittableRandom(randomSeed);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
while (true) {
int read = bis.read(readBuffer);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
readBuffer.clear();
if (read < 100) {
break;
}
}
readBuffer.clear();
int read = bis.read(readBuffer);
Assert.assertEquals(-1, read);
}
}
@Test
public void testCanReadReconstructionToEOF()
throws IOException {
int blockLength = 5 * ONEMB;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
dataGenerator = new SplittableRandom(randomSeed);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
while (true) {
int read = bis.read(readBuffer);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
readBuffer.clear();
if (read < 100) {
break;
}
}
readBuffer.clear();
int read = bis.read(readBuffer);
Assert.assertEquals(-1, read);
}
}
@Test
public void testCanHandleErrorAndFailOverToReconstruction()
throws IOException {
int blockLength = 5 * ONEMB;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
DatanodeDetails badDN = blockInfo.getPipeline().getFirstNode();
dataGenerator = new SplittableRandom(randomSeed);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
// Perform one read to get the stream created
int read = bis.read(readBuffer);
Assert.assertEquals(100, read);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
// Setup an error to be thrown part through a read, so the dataBuffer
// will have been advanced by 50 bytes before the error. This tests it
// correctly rewinds and the same data is loaded again from the other
// stream.
streamFactory.getStreams().get(false).setShouldError(true, 151,
new BadDataLocationException(badDN, "Simulated Error"));
while (true) {
readBuffer.clear();
read = bis.read(readBuffer);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
if (read < 100) {
break;
}
}
readBuffer.clear();
read = bis.read(readBuffer);
Assert.assertEquals(-1, read);
// Ensure the bad location was passed into the factory to create the
// reconstruction reader
Assert.assertEquals(badDN, streamFactory.getFailedLocations().get(0));
}
}
@Test
public void testCanSeekToNewPosition() throws IOException {
int blockLength = 5 * ONEMB;
generateData(blockLength);
Map<DatanodeDetails, Integer> dnMap =
ECStreamTestUtil.createIndexMap(1, 2, 3, 4, 5);
BlockLocationInfo blockInfo =
ECStreamTestUtil.createKeyInfo(repConfig, blockLength, dnMap);
ByteBuffer readBuffer = ByteBuffer.allocate(100);
dataGenerator = new SplittableRandom(randomSeed);
try (ECBlockInputStreamProxy bis = createBISProxy(repConfig, blockInfo)) {
// Perform one read to get the stream created
int read = bis.read(readBuffer);
Assert.assertEquals(100, read);
bis.seek(1024);
readBuffer.clear();
resetAndAdvanceDataGenerator(1024);
bis.read(readBuffer);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
Assert.assertEquals(1124, bis.getPos());
// Set the non-reconstruction reader to thrown an exception on seek
streamFactory.getStreams().get(false).setShouldErrorOnSeek(true);
bis.seek(2048);
readBuffer.clear();
resetAndAdvanceDataGenerator(2048);
bis.read(readBuffer);
ECStreamTestUtil.assertBufferMatches(readBuffer, dataGenerator);
// Finally, set the recon reader to fail on seek.
streamFactory.getStreams().get(true).setShouldErrorOnSeek(true);
try {
bis.seek(1024);
Assert.fail("Seek should have raised an exception");
} catch (IOException e) {
// expected
}
}
}
private ByteBuffer generateData(int length) {
ByteBuffer data = ByteBuffer.allocate(length);
ECStreamTestUtil.randomFill(data, dataGenerator);
streamFactory.setData(data);
return data;
}
private void resetAndAdvanceDataGenerator(long position) {
dataGenerator = new SplittableRandom(randomSeed);
for (long i = 0; i < position; i++) {
dataGenerator.nextInt(255);
}
}
private ECBlockInputStreamProxy createBISProxy(ECReplicationConfig rConfig,
BlockLocationInfo blockInfo) {
return new ECBlockInputStreamProxy(
rConfig, blockInfo, true, null, null, streamFactory);
}
private static class TestECBlockInputStreamFactory
implements ECBlockInputStreamFactory {
private ByteBuffer data;
private Map<Boolean, ECStreamTestUtil.TestBlockInputStream> streams
= new HashMap<>();
private List<DatanodeDetails> failedLocations;
public void setData(ByteBuffer data) {
this.data = data;
}
public Map<Boolean, ECStreamTestUtil.TestBlockInputStream> getStreams() {
return streams;
}
public List<DatanodeDetails> getFailedLocations() {
return failedLocations;
}
@Override
public BlockExtendedInputStream create(boolean missingLocations,
List<DatanodeDetails> failedDatanodes,
ReplicationConfig repConfig, BlockLocationInfo blockInfo,
boolean verifyChecksum, XceiverClientFactory xceiverFactory,
Function<BlockID, Pipeline> refreshFunction) {
this.failedLocations = failedDatanodes;
ByteBuffer wrappedBuffer =
ByteBuffer.wrap(data.array(), 0, data.capacity());
ECStreamTestUtil.TestBlockInputStream is =
new ECStreamTestUtil.TestBlockInputStream(blockInfo.getBlockID(),
blockInfo.getLength(), wrappedBuffer);
streams.put(missingLocations, is);
return is;
}
}
}