/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.ozone.container.ec.reconstruction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.ContainerClientMetrics;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.StreamBufferArgs;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BlockLocationInfo;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.scm.storage.ECBlockOutputStream;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdds.security.symmetric.SecretKeySignerClient;
import org.apache.hadoop.hdds.security.token.ContainerTokenIdentifier;
import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
import org.apache.hadoop.ozone.client.io.ECBlockReconstructedStripeInputStream;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.security.token.Token;
import org.apache.ratis.util.MemoizedSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.hadoop.ozone.container.ec.reconstruction.TokenHelper.encode;

/**
 * The Coordinator implements the main flow of reconstructing
 * missing container replicas.
 * <p>
 * For a container reconstruction task, the main flow is:
 * - ListBlock from all healthy replicas
 * - calculate effective block group len for all blocks
 * - create RECOVERING containers in TargetDNs
 * -  for each block
 * -    build a ECReconstructedStripedInputStream to read healthy chunks
 * -    build a ECBlockOutputStream to write out decoded chunks
 * -      for each stripe
 * -        use ECReconstructedStripedInputStream.recoverChunks to decode chunks
 * -        use ECBlockOutputStream.write to write decoded chunks to TargetDNs
 * -    PutBlock
 * - Close RECOVERING containers in TargetDNs
 */
public class ECReconstructionCoordinator implements Closeable {

  static final Logger LOG =
      LoggerFactory.getLogger(ECReconstructionCoordinator.class);

  private static final int EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE = 3;

  private static final int EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE = 5;

  private final ECContainerOperationClient containerOperationClient;

  private final ByteBufferPool byteBufferPool;

  private final ExecutorService ecReconstructReadExecutor;
  private final MemoizedSupplier<ExecutorService> ecReconstructWriteExecutor;
  private final BlockInputStreamFactory blockInputStreamFactory;
  private final TokenHelper tokenHelper;
  private final ContainerClientMetrics clientMetrics;
  private final ECReconstructionMetrics metrics;
  private final StateContext context;
  private final OzoneClientConfig ozoneClientConfig;

  public ECReconstructionCoordinator(
      ConfigurationSource conf, CertificateClient certificateClient,
      SecretKeySignerClient secretKeyClient, StateContext context,
      ECReconstructionMetrics metrics,
      String threadNamePrefix) throws IOException {
    this.context = context;
    this.containerOperationClient = new ECContainerOperationClient(conf,
        certificateClient);
    this.byteBufferPool = new ElasticByteBufferPool();
    ozoneClientConfig = conf.getObject(OzoneClientConfig.class);
    this.ecReconstructReadExecutor = createThreadPoolExecutor(
        EC_RECONSTRUCT_STRIPE_READ_POOL_MIN_SIZE,
        ozoneClientConfig.getEcReconstructStripeReadPoolLimit(),
        threadNamePrefix + "ec-reconstruct-reader-TID-%d");
    this.ecReconstructWriteExecutor = MemoizedSupplier.valueOf(
        () -> createThreadPoolExecutor(
            EC_RECONSTRUCT_STRIPE_WRITE_POOL_MIN_SIZE,
            ozoneClientConfig.getEcReconstructStripeWritePoolLimit(),
            threadNamePrefix + "ec-reconstruct-writer-TID-%d"));
    this.blockInputStreamFactory = BlockInputStreamFactoryImpl
        .getInstance(byteBufferPool, () -> ecReconstructReadExecutor);
    tokenHelper = new TokenHelper(new SecurityConfig(conf), secretKeyClient);
    this.clientMetrics = ContainerClientMetrics.acquire();
    this.metrics = metrics;
  }

  public void reconstructECContainerGroup(long containerID,
      ECReplicationConfig repConfig,
      SortedMap<Integer, DatanodeDetails> sourceNodeMap,
      SortedMap<Integer, DatanodeDetails> targetNodeMap) throws IOException {

    Pipeline pipeline = rebuildInputPipeline(repConfig, sourceNodeMap);

    SortedMap<Long, BlockData[]> blockDataMap =
        getBlockDataMap(containerID, repConfig, sourceNodeMap);

    SortedMap<Long, BlockLocationInfo> blockLocationInfoMap =
        calcBlockLocationInfoMap(containerID, blockDataMap, pipeline);
    ContainerID cid = ContainerID.valueOf(containerID);

    // 1. create target recovering containers.
    String containerToken = encode(tokenHelper.getContainerToken(cid));
    List<DatanodeDetails> recoveringContainersCreatedDNs = new ArrayList<>();
    try {
      for (Map.Entry<Integer, DatanodeDetails> indexDnPair : targetNodeMap
          .entrySet()) {
        DatanodeDetails dn = indexDnPair.getValue();
        int index = indexDnPair.getKey();
        LOG.debug("Creating container {} on datanode {} for index {}",
            containerID, dn, index);
        containerOperationClient
            .createRecoveringContainer(containerID, dn, repConfig,
                containerToken, index);
        recoveringContainersCreatedDNs.add(dn);
      }

      // 2. Reconstruct and transfer to targets
      for (Map.Entry<Long, BlockLocationInfo> blockLocationInfoEntry
          : blockLocationInfoMap.entrySet()) {
        Long key = blockLocationInfoEntry.getKey();
        BlockLocationInfo blockLocationInfo = blockLocationInfoEntry.getValue();
        reconstructECBlockGroup(blockLocationInfo, repConfig,
            targetNodeMap, blockDataMap.get(key));
      }

      // 3. Close containers
      for (DatanodeDetails dn: recoveringContainersCreatedDNs) {
        LOG.debug("Closing container {} on datanode {}", containerID, dn);
        containerOperationClient
            .closeContainer(containerID, dn, repConfig, containerToken);
      }
      metrics.incReconstructionTotal();
      metrics.incBlockGroupReconstructionTotal(blockLocationInfoMap.size());
    } catch (Exception e) {
      // Any exception let's delete the recovering containers.
      metrics.incReconstructionFailsTotal();
      metrics.incBlockGroupReconstructionFailsTotal(
          blockLocationInfoMap.size());
      LOG.warn(
          "Exception while reconstructing the container {}. Cleaning up"
              + " all the recovering containers in the reconstruction process.",
          containerID, e);
      // Delete only the current thread successfully created recovering
      // containers.
      for (DatanodeDetails dn : recoveringContainersCreatedDNs) {
        try {
          containerOperationClient
              .deleteContainerInState(containerID, dn, repConfig,
                  containerToken, ImmutableSet.of(
                          ContainerProtos.ContainerDataProto.State.UNHEALTHY,
                          ContainerProtos.ContainerDataProto.State.RECOVERING));
          if (LOG.isDebugEnabled()) {
            LOG.debug("Deleted the container {}, at the target: {}",
                containerID, dn);
          }
        } catch (IOException ioe) {
          LOG.error("Exception while deleting the container {} at target: {}",
              containerID, dn, ioe);
        }
      }
      throw e;
    }

  }

  private ECBlockOutputStream getECBlockOutputStream(
      BlockLocationInfo blockLocationInfo, DatanodeDetails datanodeDetails,
      ECReplicationConfig repConfig, int replicaIndex) throws IOException {
    StreamBufferArgs streamBufferArgs =
        StreamBufferArgs.getDefaultStreamBufferArgs(repConfig, ozoneClientConfig);
    return new ECBlockOutputStream(
        blockLocationInfo.getBlockID(),
        containerOperationClient.getXceiverClientManager(),
        containerOperationClient.singleNodePipeline(datanodeDetails,
            repConfig, replicaIndex),
        BufferPool.empty(), ozoneClientConfig,
        blockLocationInfo.getToken(), clientMetrics, streamBufferArgs, ecReconstructWriteExecutor);
  }

  @VisibleForTesting
  public void reconstructECBlockGroup(BlockLocationInfo blockLocationInfo,
      ECReplicationConfig repConfig,
      SortedMap<Integer, DatanodeDetails> targetMap, BlockData[] blockDataGroup)
      throws IOException {
    long safeBlockGroupLength = blockLocationInfo.getLength();
    List<Integer> missingContainerIndexes = new ArrayList<>(targetMap.keySet());

    // calculate the real missing block indexes
    int dataLocs = ECBlockInputStreamProxy
        .expectedDataLocations(repConfig, safeBlockGroupLength);
    List<Integer> toReconstructIndexes = new ArrayList<>();
    List<Integer> notReconstructIndexes = new ArrayList<>();
    for (Integer index : missingContainerIndexes) {
      if (index <= dataLocs || index > repConfig.getData()) {
        toReconstructIndexes.add(index);
      } else {
        // Don't need to be reconstructed, but we do need a stream to write
        // the block data to.
        notReconstructIndexes.add(index);
      }
    }

    OzoneClientConfig clientConfig = this.ozoneClientConfig;
    clientConfig.setChecksumVerify(true);
    try (ECBlockReconstructedStripeInputStream sis
        = new ECBlockReconstructedStripeInputStream(
        repConfig, blockLocationInfo,
        this.containerOperationClient.getXceiverClientManager(), null,
        this.blockInputStreamFactory, byteBufferPool,
        this.ecReconstructReadExecutor,
        clientConfig)) {

      ECBlockOutputStream[] targetBlockStreams =
          new ECBlockOutputStream[toReconstructIndexes.size()];
      ECBlockOutputStream[] emptyBlockStreams =
          new ECBlockOutputStream[notReconstructIndexes.size()];
      ByteBuffer[] bufs = new ByteBuffer[toReconstructIndexes.size()];
      try {
        // Create streams and buffers for all indexes that need reconstructed
        for (int i = 0; i < toReconstructIndexes.size(); i++) {
          int replicaIndex = toReconstructIndexes.get(i);
          DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
          targetBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
          bufs[i] = byteBufferPool.getBuffer(false, repConfig.getEcChunkSize());
          bufs[i].clear();
        }
        // Then create a stream for all indexes that don't need reconstructed, but still need a stream to
        // write the empty block data to.
        for (int i = 0; i < notReconstructIndexes.size(); i++) {
          int replicaIndex = notReconstructIndexes.get(i);
          DatanodeDetails datanodeDetails = targetMap.get(replicaIndex);
          emptyBlockStreams[i] = getECBlockOutputStream(blockLocationInfo, datanodeDetails, repConfig, replicaIndex);
        }

        if (toReconstructIndexes.size() > 0) {
          sis.setRecoveryIndexes(toReconstructIndexes.stream().map(i -> (i - 1))
              .collect(Collectors.toSet()));
          long length = safeBlockGroupLength;
          while (length > 0) {
            int readLen;
            try {
              readLen = sis.recoverChunks(bufs);
              Set<Integer> failedIndexes = sis.getFailedIndexes();
              if (!failedIndexes.isEmpty()) {
                // There was a problem reading some of the block indexes, but we
                // did not get an exception as there must have been spare indexes
                // to try and recover from. Therefore we should log out the block
                // group details in the same way as for the exception case below.
                logBlockGroupDetails(blockLocationInfo, repConfig,
                    blockDataGroup);
              }
            } catch (IOException e) {
              // When we see exceptions here, it could be due to some transient
              // issue that causes the block read to fail when reconstructing it,
              // but we have seen issues where the containers don't have the
              // blocks they appear they should have, or the block chunks are the
              // wrong length etc. In order to debug these sort of cases, if we
              // get an error, we will log out the details about the block group
              // length on each source, along with their chunk list and chunk
              // lengths etc.
              logBlockGroupDetails(blockLocationInfo, repConfig,
                  blockDataGroup);
              throw e;
            }
            // TODO: can be submitted in parallel
            for (int i = 0; i < bufs.length; i++) {
              CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
                  future = targetBlockStreams[i].write(bufs[i]);
              checkFailures(targetBlockStreams[i], future);
              bufs[i].clear();
            }
            length -= readLen;
          }
        }
        List<ECBlockOutputStream> allStreams = new ArrayList<>(Arrays.asList(targetBlockStreams));
        allStreams.addAll(Arrays.asList(emptyBlockStreams));
        for (ECBlockOutputStream targetStream : allStreams) {
          targetStream.executePutBlock(true, true, blockLocationInfo.getLength(), blockDataGroup);
          checkFailures(targetStream, targetStream.getCurrentPutBlkResponseFuture());
        }
      } finally {
        for (ByteBuffer buf : bufs) {
          byteBufferPool.putBuffer(buf);
        }
        IOUtils.cleanupWithLogger(LOG, targetBlockStreams);
        IOUtils.cleanupWithLogger(LOG, emptyBlockStreams);
      }
    }
  }

  private void logBlockGroupDetails(BlockLocationInfo blockLocationInfo,
      ECReplicationConfig repConfig, BlockData[] blockDataGroup) {
    LOG.info("Block group details for {}. " +
        "Replication Config {}. Calculated safe length: {}. ",
        blockLocationInfo.getBlockID(), repConfig,
        blockLocationInfo.getLength());
    for (int i = 0; i < blockDataGroup.length; i++) {
      BlockData data = blockDataGroup[i];
      if (data == null) {
        continue;
      }
      StringBuilder sb = new StringBuilder();
      sb.append("Block Data for: ")
          .append(data.getBlockID())
          .append(" replica Index: ")
          .append(i + 1)
          .append(" block length: ")
          .append(data.getSize())
          .append(" block group length: ")
          .append(getBlockDataLength(data))
          .append(" chunk list: \n");
      int cnt = 0;
      for (ContainerProtos.ChunkInfo chunkInfo : data.getChunks()) {
        if (cnt > 0) {
          sb.append("\n");
        }
        sb.append("  chunkNum: ")
            .append(++cnt)
            .append(" length: ")
            .append(chunkInfo.getLen())
            .append(" offset: ")
            .append(chunkInfo.getOffset());
      }
      LOG.info(sb.toString());
    }
  }

  private void checkFailures(ECBlockOutputStream targetBlockStream,
      CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
          currentPutBlkResponseFuture)
      throws IOException {
    if (isFailed(targetBlockStream, currentPutBlkResponseFuture)) {
      // If one chunk response failed, we should retry.
      // Even after retries if it failed, we should declare the
      // reconstruction as failed.
      // For now, let's throw the exception.
      throw new IOException("Chunk write failed at the new target node: " +
          targetBlockStream.getDatanodeDetails() +
          ". Aborting the reconstruction process.",
          targetBlockStream.getIoException());
    }
  }

  private boolean isFailed(ECBlockOutputStream outputStream,
      CompletableFuture<ContainerProtos.
          ContainerCommandResponseProto> chunkWriteResponseFuture) {
    if (chunkWriteResponseFuture == null) {
      return true;
    }

    ContainerProtos.ContainerCommandResponseProto
        containerCommandResponseProto = null;
    try {
      containerCommandResponseProto = chunkWriteResponseFuture.get();
    } catch (InterruptedException e) {
      outputStream.setIoException(e);
      Thread.currentThread().interrupt();
    } catch (ExecutionException e) {
      outputStream.setIoException(e);
    }

    if (outputStream.getIoException() != null) {
      return true;
    }

    if (containerCommandResponseProto == null) {
      return true;
    }

    return false;
  }

  SortedMap<Long, BlockLocationInfo> calcBlockLocationInfoMap(long containerID,
      SortedMap<Long, BlockData[]> blockDataMap, Pipeline pipeline) {

    SortedMap<Long, BlockLocationInfo> blockInfoMap = new TreeMap<>();

    for (Map.Entry<Long, BlockData[]> entry : blockDataMap.entrySet()) {
      Long localID = entry.getKey();
      BlockData[] blockGroup = entry.getValue();

      long blockGroupLen = calcEffectiveBlockGroupLen(blockGroup,
          pipeline.getReplicationConfig().getRequiredNodes());
      if (blockGroupLen > 0) {
        BlockID blockID = new BlockID(containerID, localID);
        BlockLocationInfo blockLocationInfo = new BlockLocationInfo.Builder()
            .setBlockID(blockID)
            .setLength(blockGroupLen)
            .setPipeline(pipeline)
            .setToken(tokenHelper.getBlockToken(blockID, blockGroupLen))
            .build();
        blockInfoMap.put(localID, blockLocationInfo);
      }
    }
    return blockInfoMap;
  }

  @Override
  public void close() throws IOException {
    if (containerOperationClient != null) {
      containerOperationClient.close();
    }
    if (ecReconstructWriteExecutor.isInitialized()) {
      ecReconstructWriteExecutor.get().shutdownNow();
    }
  }

  private Pipeline rebuildInputPipeline(ECReplicationConfig repConfig,
      SortedMap<Integer, DatanodeDetails> sourceNodeMap) {

    List<DatanodeDetails> nodes = new ArrayList<>(sourceNodeMap.values());
    Map<DatanodeDetails, Integer> dnVsIndex = new HashMap<>();

    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
        sourceNodeMap.entrySet().iterator();
    while (iterator.hasNext()) {
      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
      Integer key = next.getKey();
      DatanodeDetails value = next.getValue();
      dnVsIndex.put(value, key);
    }

    return Pipeline.newBuilder().setId(PipelineID.randomId())
        .setReplicationConfig(repConfig).setNodes(nodes)
        .setReplicaIndexes(dnVsIndex).setState(Pipeline.PipelineState.CLOSED)
        .build();
  }

  private SortedMap<Long, BlockData[]> getBlockDataMap(long containerID,
      ECReplicationConfig repConfig,
      Map<Integer, DatanodeDetails> sourceNodeMap) throws IOException {

    SortedMap<Long, BlockData[]> resultMap = new TreeMap<>();
    Token<ContainerTokenIdentifier> containerToken =
        tokenHelper.getContainerToken(new ContainerID(containerID));

    Iterator<Map.Entry<Integer, DatanodeDetails>> iterator =
        sourceNodeMap.entrySet().iterator();

    while (iterator.hasNext()) {
      Map.Entry<Integer, DatanodeDetails> next = iterator.next();
      Integer index = next.getKey();
      DatanodeDetails dn = next.getValue();

      BlockData[] blockDataArr = containerOperationClient.listBlock(
          containerID, dn, repConfig, containerToken);

      for (BlockData blockData : blockDataArr) {
        BlockID blockID = blockData.getBlockID();
        BlockData[] blkDataArr = resultMap.getOrDefault(blockData.getLocalID(),
            new BlockData[repConfig.getRequiredNodes()]);
        blkDataArr[index - 1] = blockData;
        resultMap.put(blockID.getLocalID(), blkDataArr);
      }
    }
    // When a stripe is written, the put block is sent to all nodes even if
    // that nodes has zero bytes written to it. If the
    // client does not get an ACK from all nodes, it will abandon the stripe,
    // which can leave incomplete stripes on the DNs. Therefore, we should check
    // that all blocks in the result map have an entry for all nodes. If they
    // do not, it means this is an abandoned stripe and we should not attempt
    // to reconstruct it.
    // Note that if some nodes report different values for the block length,
    // it also indicate garbage data at the end of the block. A different part
    // of the code handles this and only reconstructs the valid part of the
    // block, ie the minimum length reported by the nodes.
    Iterator<Map.Entry<Long, BlockData[]>> resultIterator
        = resultMap.entrySet().iterator();
    while (resultIterator.hasNext()) {
      Map.Entry<Long, BlockData[]> entry = resultIterator.next();
      BlockData[] blockDataArr = entry.getValue();
      for (Map.Entry<Integer, DatanodeDetails> e : sourceNodeMap.entrySet()) {
        // There should be an entry in the Array for each keyset node. If there
        // is not, this is an orphaned stripe and we should remove it from the
        // result.
        if (blockDataArr[e.getKey() - 1] == null) {
          LOG.warn("In container {} block {} does not have a putBlock entry " +
              "for index {} on datanode {} making it an orphan block / " +
              "stripe. It will not be reconstructed", containerID,
              entry.getKey(), e.getKey(), e.getValue());
          resultIterator.remove();
          break;
        }
      }
    }
    return resultMap;
  }

  /**
   * Get the effective length of each block group.
   * We can not be absolutely accurate when there is a failed stripe
   * in this block since the failed cells could be missing, and
   * we can not tell from the healthy cells whether the last stripe
   * is failed or not. But in such case we at most recover one extra
   * stripe for this block which does not confuse the client data view.
   *
   * @param blockGroup
   * @param replicaCount
   * @return
   */
  private long calcEffectiveBlockGroupLen(BlockData[] blockGroup,
      int replicaCount) {
    Preconditions.checkState(blockGroup.length == replicaCount);

    long blockGroupLen = Long.MAX_VALUE;

    for (int i = 0; i < replicaCount; i++) {
      if (blockGroup[i] == null) {
        continue;
      }

      long putBlockLen = getBlockDataLength(blockGroup[i]);
      // Use safe length is the minimum of the lengths recorded across the
      // stripe
      blockGroupLen = Math.min(putBlockLen, blockGroupLen);
    }
    return blockGroupLen == Long.MAX_VALUE ? 0 : blockGroupLen;
  }

  private long getBlockDataLength(BlockData blockData) {
    String lenStr = blockData.getMetadata()
        .get(OzoneConsts.BLOCK_GROUP_LEN_KEY_IN_PUT_BLOCK);
    // If we don't have the length, then it indicates a problem with the stripe.
    // All replica should carry the length, so if it is not there, we return 0,
    // which will cause us to set the length of the block to zero and not
    // attempt to reconstruct it.
    return (lenStr == null) ? 0 : Long.parseLong(lenStr);
  }

  public ECReconstructionMetrics getECReconstructionMetrics() {
    return this.metrics;
  }

  OptionalLong getTermOfLeaderSCM() {
    return Optional.ofNullable(context)
        .map(StateContext::getTermOfLeaderSCM)
        .orElse(OptionalLong.empty());
  }

  private static ExecutorService createThreadPoolExecutor(
      int corePoolSize, int maximumPoolSize, String threadNameFormat) {
    return new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
        60, TimeUnit.SECONDS, new SynchronousQueue<>(),
        new ThreadFactoryBuilder().setNameFormat(threadNameFormat).build(),
        new ThreadPoolExecutor.CallerRunsPolicy());
  }
}
