blob: 774c7d8345452287524f8bd95a06eec380231751 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.container.ec.reconstruction;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode;
/**
* The Coordinator implements the main flow of reconstructing
* missing container replicas.
* <p>
* For a container reconstruction task, the main flow is:
* - ListBlock from all healthy replicas
* - calculate effective block group len for all blocks
* - create RECOVERING containers in TargetDNs
* - for each block
* - build a ECReconstructedStripedInputStream to read healthy chunks
* - build a ECBlockOutputStream to write out decoded chunks
* - for each stripe
* - use ECReconstructedStripedInputStream.recoverChunks to decode chunks
* - use ECBlockOutputStream.write to write decoded chunks to TargetDNs
* - PutBlock
* - Close RECOVERING containers in TargetDNs
*/
public class ECReconstructionCoordinator implements Closeable {
static final Logger LOG =
LoggerFactory.getLogger(ECReconstructionCoordinator.class);
private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;
private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;
private final ECContainerOperationClient containerOperationClient;
private final ByteBufferPool byteBufferPool;
private final ExecutorService ecReconstructReadExecutor;
private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
private final BlockInputStreamFactory blockInputStreamFactory;
private final TokenHelper tokenHelper;
private final ContainerClientMetrics clientMetrics;
private final ECReconstructionMetrics metrics;
private final StateContext context;
private final OzoneClientConfig ozoneClientConfig;
public ECReconstructionCoordinator(
ConfigurationSource conf, CertificateClient certificateClient,
SecretKeySignerClient secretKeyClient, StateContext context,
ECReconstructionMetrics metrics,
String threadNamePrefix) throws IOException {
this.context = context;
this.containerOperationClient = new ECContainerOperationClient(conf,
certificateClient);
this.byteBufferPool = new ElasticByteBufferPool();
ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
this.ecReconstructReadExecutor = createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
threadNamePrefix + "ec-reconstruct-reader-TID-%d");
this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
() -> createThreadPoolExecutor(
EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
this.blockInputStreamFactory = BlockInputStreamFactoryImpl
.getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
this.clientMetrics = ContainerClientMetrics.acquire();
this.metrics = metrics;
}
public void reconstructECContainerGroup(long containerID,
ECReplicationConfig repConfig,
SortedMap<Integer, DatanodeDetails> sourceNodeMap,
SortedMap<Integer, DatanodeDetails> targetNodeMap) throws IOException {
Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap);
SortedMap<Long, BlockData[]> blockDataMap =
getBlockDataMap(containerID, repConfig, sourceNodeMap);
SortedMap<Long, BlockLocationInfo> blockLocationInfoMap =
calcBlockLocationInfoMap(containerID, blockDataMap, pipeline);
ContainerID cid = ContainerID.valueOf(containerID);
// 1. create target recovering containers.
String containerToken = encode(tokenHelper.getContainerToken(cid));
List<DatanodeDetails> recoveringContainersCreatedDNs = new ArrayList<>();
try {
for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
.entrySet()) {
DatanodeDetails dn = indexDnPair.getValue();
int index = indexDnPair.getKey();
LOG.debug("Creating container {} on datanode {} for index {}",
containerID, dn, index);
containerOperationClient
.createRecoveringContainer(containerID, dn, repConfig,
containerToken, index);
recoveringContainersCreatedDNs.add(dn);
}
// 2. Reconstruct and transfer to targets
for (Map.Entry<Long, BlockLocationInfo> blockLocationInfoEntry
: blockLocationInfoMap.entrySet()) {
Long key = blockLocationInfoEntry.getKey();
BlockLocationInfo blockLocationInfo = blockLocationInfoEntry.getValue();
reconstructECBlockGroup(blockLocationInfo, repConfig,
targetNodeMap, blockDataMap.get(key));
}
// 3. Close containers
for (DatanodeDetails dn: recoveringContainersCreatedDNs) {
LOG.debug("Closing container {} on datanode {}", containerID, dn);
containerOperationClient
.closeContainer(containerID, dn, repConfig, containerToken);
}
metrics.incReconstructionTotal();
metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size());
} catch (Exception e) {
// Any exception let's delete the recovering containers.
metrics.incReconstructionFailsTotal();
metrics.incBlockGroupReconstructionFailsTotal(
blockLocationInfoMap.size());
LOG.warn(
"Exception while reconstructing the container {}. Cleaning up"
+ " all the recovering containers in the reconstruction process.",
containerID, e);
// Delete only the current thread successfully created recovering
// containers.
for (DatanodeDetails dn : recoveringContainersCreatedDNs) {
try {
containerOperationClient
.deleteContainerInState(containerID, dn, repConfig,
containerToken, ImmutableSet.of(
ContainerProtos.ContainerDataProto.State.UNHEALTHY,
ContainerProtos.ContainerDataProto.State.RECOVERING));
if (LOG.isDebugEnabled()) {
LOG.debug("Deleted the container {}, at the target: {}",
containerID, dn);
}
} catch (IOException ioe) {
LOG.error("Exception while deleting the container {} at target: {}",
containerID, dn, ioe);
}
}
throw e;
}
}
private ECBlockOutputStream getECBlockOutputStream(
BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
ECReplicationConfig repConfig, int replicaIndex) throws IOException {
StreamBufferArgs streamBufferArgs =
StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, ozoneClientConfig);
return new ECBlockOutputStream(
blockLocationInfo.getBlockID(),
containerOperationClient.getXceiverClientManager(),
containerOperationClient.singleNodePipeline(datanodeDetails,
repConfig, replicaIndex),
BufferPool.empty(), ozoneClientConfig,
blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor);
}
@VisibleForTesting
public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
ECReplicationConfig repConfig,
SortedMap<Integer, DatanodeDetails> targetMap, BlockData[] blockDataGroup)
throws IOException {
long safeBlockGroupLength = blockLocationInfo.getLength();
List<Integer> missingContainerIndexes = new ArrayList<>(targetMap.keySet());
// calculate the real missing block indexes
int dataLocs = ECBlockInputStreamProxy
.expectedDataLocations(repConfig, safeBlockGroupLength);
List<Integer> toReconstructIndexes = new ArrayList<>();
List<Integer> notReconstructIndexes = new ArrayList<>();
for (Integer index : missingContainerIndexes) {
if (index <= dataLocs || index > repConfig.getData()) {
toReconstructIndexes.add(index);
} else {
// Don't need to be reconstructed, but we do need a stream to write
// the block data to.
notReconstructIndexes.add(index);
}
}
OzoneClientConfig clientConfig = this.ozoneClientConfig;
clientConfig.setChecksumVerify(true);
try (ECBlockReconstructedStripeInputStream sis
= new ECBlockReconstructedStripeInputStream(
repConfig, blockLocationInfo,
this.containerOperationClient.getXceiverClientManager(), null,
this.blockInputStreamFactory, byteBufferPool,
this.ecReconstructReadExecutor,
clientConfig)) {
ECBlockOutputStream[] targetBlockStreams =
new ECBlockOutputStream[toReconstructIndexes.size()];
ECBlockOutputStream[] emptyBlockStreams =
new ECBlockOutputStream[notReconstructIndexes.size()];
ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
try {
// Create streams and buffers for all indexes that need reconstructed
for (int i = 0; i < toReconstructIndexes.size(); i++) {
int replicaIndex = toReconstructIndexes.get(i);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
bufs[i].clear();
}
// Then create a stream for all indexes that don't need reconstructed, but still need a stream to
// write the empty block data to.
for (int i = 0; i < notReconstructIndexes.size(); i++) {
int replicaIndex = notReconstructIndexes.get(i);
DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
}
if (toReconstructIndexes.size() > 0) {
sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
.collect(Collectors.toSet()));
long length = safeBlockGroupLength;
while (length > 0) {
int readLen;
try {
readLen = sis.recoverChunks(bufs);
Set<Integer> failedIndexes = sis.getFailedIndexes();
if (!failedIndexes.isEmpty()) {
// There was a problem reading some of the block indexes, but we
// did not get an exception as there must have been spare indexes
// to try and recover from. Therefore we should log out the block
// group details in the same way as for the exception case below.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
}
} catch (IOException e) {
// When we see exceptions here, it could be due to some transient
// issue that causes the block read to fail when reconstructing it,
// but we have seen issues where the containers don't have the
// blocks they appear they should have, or the block chunks are the
// wrong length etc. In order to debug these sort of cases, if we
// get an error, we will log out the details about the block group
// length on each source, along with their chunk list and chunk
// lengths etc.
logBlockGroupDetails(blockLocationInfo, repConfig,
blockDataGroup);
throw e;
}
// TODO: can be submitted in parallel
for (int i = 0; i < bufs.length; i++) {
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
future = targetBlockStreams[i].write(bufs[i]);
checkFailures(targetBlockStreams[i], future);
bufs[i].clear();
}
length -= readLen;
}
}
List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
allStreams.addAll(Arrays.asList(emptyBlockStreams));
for (ECBlockOutputStream targetStream : allStreams) {
targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
}
} finally {
for (ByteBuffer buf : bufs) {
byteBufferPool.putBuffer(buf);
}
IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
IOUtils.cleanupWithLogger(LOG, emptyBlockStreams);
}
}
}
private void logBlockGroupDetails(BlockLocationInfo blockLocationInfo,
ECReplicationConfig repConfig, BlockData[] blockDataGroup) {
LOG.info("Block group details for {}. " +
"Replication Config {}. Calculated safe length: {}. ",
blockLocationInfo.getBlockID(), repConfig,
blockLocationInfo.getLength());
for (int i = 0; i < blockDataGroup.length; i++) {
BlockData data = blockDataGroup[i];
if (data == null) {
continue;
}
StringBuilder sb = new StringBuilder();
sb.append("Block Data for: ")
.append(data.getBlockID())
.append(" replica Index: ")
.append(i + 1)
.append(" block length: ")
.append(data.getSize())
.append(" block group length: ")
.append(getBlockDataLength(data))
.append(" chunk list: \n");
int cnt = 0;
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunks()) {
if (cnt > 0) {
sb.append("\n");
}
sb.append(" chunkNum: ")
.append(++cnt)
.append(" length: ")
.append(chunkInfo.getLen())
.append(" offset: ")
.append(chunkInfo.getOffset());
}
LOG.info(sb.toString());
}
}
private void checkFailures(ECBlockOutputStream targetBlockStream,
CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
currentPutBlkResponseFuture)
throws IOException {
if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) {
// If one chunk response failed, we should retry.
// Even after retries if it failed, we should declare the
// reconstruction as failed.
// For now, let's throw the exception.
throw new IOException("Chunk write failed at the new target node: " +
targetBlockStream.getDatanodeDetails() +
". Aborting the reconstruction process.",
targetBlockStream.getIoException());
}
}
private boolean isFailed(ECBlockOutputStream outputStream,
CompletableFuture<ContainerProtos.
ContainerCommandResponseProto> chunkWriteResponseFuture) {
if (chunkWriteResponseFuture == null) {
return true;
}
ContainerProtos.ContainerCommandResponseProto
containerCommandResponseProto = null;
try {
containerCommandResponseProto = chunkWriteResponseFuture.get();
} catch (InterruptedException e) {
outputStream.setIoException(e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
outputStream.setIoException(e);
}
if (outputStream.getIoException() != null) {
return true;
}
if (containerCommandResponseProto == null) {
return true;
}
return false;
}
SortedMap<Long, BlockLocationInfo> calcBlockLocationInfoMap(long containerID,
SortedMap<Long, BlockData[]> blockDataMap, Pipeline pipeline) {
SortedMap<Long, BlockLocationInfo> blockInfoMap = new TreeMap<>();
for (Map.Entry<Long, BlockData[]> entry : blockDataMap.entrySet()) {
Long localID = entry.getKey();
BlockData[] blockGroup = entry.getValue();
long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup,
pipeline.getReplicationConfig().getRequiredNodes());
if (blockGroupLen > 0) {
BlockID blockID = new BlockID(containerID, localID);
BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder()
.setBlockID(blockID)
.setLength(blockGroupLen)
.setPipeline(pipeline)
.setToken(tokenHelper.getBlockToken(blockID, blockGroupLen))
.build();
blockInfoMap.put(localID, blockLocationInfo);
}
}
return blockInfoMap;
}
@Override
public void close() throws IOException {
if (containerOperationClient != null) {
containerOperationClient.close();
}
if (ecReconstructWriteExecutor.isInitialized()) {
ecReconstructWriteExecutor.get().shutdownNow();
}
}
private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
SortedMap<Integer, DatanodeDetails> sourceNodeMap) {
List<DatanodeDetails> nodes = new ArrayList<>(sourceNodeMap.values());
Map<DatanodeDetails, Integer> dnVsIndex = new HashMap<>();
Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
sourceNodeMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, DatanodeDetails> next = iterator.next();
Integer key = next.getKey();
DatanodeDetails value = next.getValue();
dnVsIndex.put(value, key);
}
return Pipeline.newBuilder().setId(PipelineID.randomId())
.setReplicationConfig(repConfig).setNodes(nodes)
.setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED)
.build();
}
private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
ECReplicationConfig repConfig,
Map<Integer, DatanodeDetails> sourceNodeMap) throws IOException {
SortedMap<Long, BlockData[]> resultMap = new TreeMap<>();
Token<ContainerTokenIdentifier> containerToken =
tokenHelper.getContainerToken(new ContainerID(containerID));
Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
sourceNodeMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, DatanodeDetails> next = iterator.next();
Integer index = next.getKey();
DatanodeDetails dn = next.getValue();
BlockData[] blockDataArr = containerOperationClient.listBlock(
containerID, dn, repConfig, containerToken);
for (BlockData blockData : blockDataArr) {
BlockID blockID = blockData.getBlockID();
BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(),
new BlockData[repConfig.getRequiredNodes()]);
blkDataArr[index - 1] = blockData;
resultMap.put(blockID.getLocalID(), blkDataArr);
}
}
// When a stripe is written, the put block is sent to all nodes even if
// that nodes has zero bytes written to it. If the
// client does not get an ACK from all nodes, it will abandon the stripe,
// which can leave incomplete stripes on the DNs. Therefore, we should check
// that all blocks in the result map have an entry for all nodes. If they
// do not, it means this is an abandoned stripe and we should not attempt
// to reconstruct it.
// Note that if some nodes report different values for the block length,
// it also indicate garbage data at the end of the block. A different part
// of the code handles this and only reconstructs the valid part of the
// block, ie the minimum length reported by the nodes.
Iterator<Map.Entry<Long, BlockData[]>> resultIterator
= resultMap.entrySet().iterator();
while (resultIterator.hasNext()) {
Map.Entry<Long, BlockData[]> entry = resultIterator.next();
BlockData[] blockDataArr = entry.getValue();
for (Map.Entry<Integer, DatanodeDetails> e : sourceNodeMap.entrySet()) {
// There should be an entry in the Array for each keyset node. If there
// is not, this is an orphaned stripe and we should remove it from the
// result.
if (blockDataArr[e.getKey() - 1] == null) {
LOG.warn("In container {} block {} does not have a putBlock entry " +
"for index {} on datanode {} making it an orphan block / " +
"stripe. It will not be reconstructed", containerID,
entry.getKey(), e.getKey(), e.getValue());
resultIterator.remove();
break;
}
}
}
return resultMap;
}
/**
* Get the effective length of each block group.
* We can not be absolutely accurate when there is a failed stripe
* in this block since the failed cells could be missing, and
* we can not tell from the healthy cells whether the last stripe
* is failed or not. But in such case we at most recover one extra
* stripe for this block which does not confuse the client data view.
*
* @param blockGroup
* @param replicaCount
* @return
*/
private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
int replicaCount) {
Preconditions.checkState(blockGroup.length == replicaCount);
long blockGroupLen = Long.MAX_VALUE;
for (int i = 0; i < replicaCount; i++) {
if (blockGroup[i] == null) {
continue;
}
long putBlockLen = getBlockDataLength(blockGroup[i]);
// Use safe length is the minimum of the lengths recorded across the
// stripe
blockGroupLen = Math.min(putBlockLen, blockGroupLen);
}
return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
}
private long getBlockDataLength(BlockData blockData) {
String lenStr = blockData.getMetadata()
.get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
// If we don't have the length, then it indicates a problem with the stripe.
// All replica should carry the length, so if it is not there, we return 0,
// which will cause us to set the length of the block to zero and not
// attempt to reconstruct it.
return (lenStr == null) ? 0 : Long.parseLong(lenStr);
}
public ECReconstructionMetrics getECReconstructionMetrics() {
return this.metrics;
}
OptionalLong getTermOfLeaderSCM() {
return Optional.ofNullable(context)
.map(StateContext::getTermOfLeaderSCM)
.orElse(OptionalLong.empty());
}
private static ExecutorService createThreadPoolExecutor(
int corePoolSize, int maximumPoolSize, String threadNameFormat) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
60, TimeUnit.SECONDS, new SynchronousQueue<>(),
new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}