| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs; |
| |
| import org.apache.hadoop.fs.CompositeCrcFileChecksum; |
| import org.apache.hadoop.fs.FileChecksum; |
| import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; |
| import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum; |
| import org.apache.hadoop.fs.Options.ChecksumCombineMode; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.hdfs.protocol.BlockChecksumOptions; |
| import org.apache.hadoop.hdfs.protocol.BlockChecksumType; |
| import org.apache.hadoop.hdfs.protocol.ClientProtocol; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlock; |
| import org.apache.hadoop.hdfs.protocol.LocatedBlocks; |
| import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; |
| import org.apache.hadoop.hdfs.protocol.StripedBlockInfo; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Op; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; |
| import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; |
| import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; |
| import org.apache.hadoop.io.DataOutputBuffer; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.MD5Hash; |
| import org.apache.hadoop.util.CrcComposer; |
| import org.apache.hadoop.util.CrcUtil; |
| import org.apache.hadoop.util.DataChecksum; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.util.List; |
| |
| /** |
| * Utility classes to compute file checksum for both replicated and striped |
| * files. |
| */ |
| final class FileChecksumHelper { |
| static final Logger LOG = |
| LoggerFactory.getLogger(FileChecksumHelper.class); |
| |
| private FileChecksumHelper() {} |
| |
| /** |
| * A common abstract class to compute file checksum. |
| */ |
| static abstract class FileChecksumComputer { |
| private final String src; |
| private final long length; |
| private final DFSClient client; |
| private final ClientProtocol namenode; |
| private final ChecksumCombineMode combineMode; |
| private final BlockChecksumType blockChecksumType; |
| private final DataOutputBuffer blockChecksumBuf = new DataOutputBuffer(); |
| |
| private FileChecksum fileChecksum; |
| private LocatedBlocks blockLocations; |
| |
| private int timeout; |
| private List<LocatedBlock> locatedBlocks; |
| private long remaining = 0L; |
| |
| private int bytesPerCRC = -1; |
| private DataChecksum.Type crcType = DataChecksum.Type.DEFAULT; |
| private long crcPerBlock = 0; |
| private boolean isRefetchBlocks = false; |
| private int lastRetriedIndex = -1; |
| |
| /** |
| * Constructor that accepts all the input parameters for the computing. |
| */ |
| FileChecksumComputer(String src, long length, |
| LocatedBlocks blockLocations, |
| ClientProtocol namenode, |
| DFSClient client, |
| ChecksumCombineMode combineMode) throws IOException { |
| this.src = src; |
| this.length = length; |
| this.blockLocations = blockLocations; |
| this.namenode = namenode; |
| this.client = client; |
| this.combineMode = combineMode; |
| switch (combineMode) { |
| case MD5MD5CRC: |
| this.blockChecksumType = BlockChecksumType.MD5CRC; |
| break; |
| case COMPOSITE_CRC: |
| this.blockChecksumType = BlockChecksumType.COMPOSITE_CRC; |
| break; |
| default: |
| throw new IOException("Unknown ChecksumCombineMode: " + combineMode); |
| } |
| |
| this.remaining = length; |
| |
| if (blockLocations != null) { |
| if (src.contains(HdfsConstants.SEPARATOR_DOT_SNAPSHOT_DIR_SEPARATOR)) { |
| this.remaining = Math.min(length, blockLocations.getFileLength()); |
| } |
| this.locatedBlocks = blockLocations.getLocatedBlocks(); |
| } |
| } |
| |
| String getSrc() { |
| return src; |
| } |
| |
| long getLength() { |
| return length; |
| } |
| |
| DFSClient getClient() { |
| return client; |
| } |
| |
| ClientProtocol getNamenode() { |
| return namenode; |
| } |
| |
| ChecksumCombineMode getCombineMode() { |
| return combineMode; |
| } |
| |
| BlockChecksumType getBlockChecksumType() { |
| return blockChecksumType; |
| } |
| |
| DataOutputBuffer getBlockChecksumBuf() { |
| return blockChecksumBuf; |
| } |
| |
| FileChecksum getFileChecksum() { |
| return fileChecksum; |
| } |
| |
| LocatedBlocks getBlockLocations() { |
| return blockLocations; |
| } |
| |
| void refetchBlocks() throws IOException { |
| this.blockLocations = getClient().getBlockLocations(getSrc(), |
| getLength()); |
| this.locatedBlocks = getBlockLocations().getLocatedBlocks(); |
| this.isRefetchBlocks = false; |
| } |
| |
| int getTimeout() { |
| return timeout; |
| } |
| |
| void setTimeout(int timeout) { |
| this.timeout = timeout; |
| } |
| |
| List<LocatedBlock> getLocatedBlocks() { |
| return locatedBlocks; |
| } |
| |
| long getRemaining() { |
| return remaining; |
| } |
| |
| void setRemaining(long remaining) { |
| this.remaining = remaining; |
| } |
| |
| int getBytesPerCRC() { |
| return bytesPerCRC; |
| } |
| |
| void setBytesPerCRC(int bytesPerCRC) { |
| this.bytesPerCRC = bytesPerCRC; |
| } |
| |
| DataChecksum.Type getCrcType() { |
| return crcType; |
| } |
| |
| void setCrcType(DataChecksum.Type crcType) { |
| this.crcType = crcType; |
| } |
| |
| long getCrcPerBlock() { |
| return crcPerBlock; |
| } |
| |
| void setCrcPerBlock(long crcPerBlock) { |
| this.crcPerBlock = crcPerBlock; |
| } |
| |
| boolean isRefetchBlocks() { |
| return isRefetchBlocks; |
| } |
| |
| void setRefetchBlocks(boolean refetchBlocks) { |
| this.isRefetchBlocks = refetchBlocks; |
| } |
| |
| int getLastRetriedIndex() { |
| return lastRetriedIndex; |
| } |
| |
| void setLastRetriedIndex(int lastRetriedIndex) { |
| this.lastRetriedIndex = lastRetriedIndex; |
| } |
| |
| /** |
| * Perform the file checksum computing. The intermediate results are stored |
| * in the object and will be used later. |
| * @throws IOException |
| */ |
| void compute() throws IOException { |
| /** |
| * request length is 0 or the file is empty, return one with the |
| * magic entry that matches what previous hdfs versions return. |
| */ |
| if (locatedBlocks == null || locatedBlocks.isEmpty()) { |
| // Explicitly specified here in case the default DataOutputBuffer |
| // buffer length value is changed in future. This matters because the |
| // fixed value 32 has to be used to repeat the magic value for previous |
| // HDFS version. |
| final int lenOfZeroBytes = 32; |
| byte[] emptyBlockMd5 = new byte[lenOfZeroBytes]; |
| MD5Hash fileMD5 = MD5Hash.digest(emptyBlockMd5); |
| fileChecksum = new MD5MD5CRC32GzipFileChecksum(0, 0, fileMD5); |
| } else { |
| checksumBlocks(); |
| fileChecksum = makeFinalResult(); |
| } |
| } |
| |
| /** |
| * Compute block checksums block by block and append the raw bytes of the |
| * block checksums into getBlockChecksumBuf(). |
| * |
| * @throws IOException |
| */ |
| abstract void checksumBlocks() throws IOException; |
| |
| /** |
| * Make final file checksum result given the per-block or per-block-group |
| * checksums collected into getBlockChecksumBuf(). |
| */ |
| FileChecksum makeFinalResult() throws IOException { |
| switch (combineMode) { |
| case MD5MD5CRC: |
| return makeMd5CrcResult(); |
| case COMPOSITE_CRC: |
| return makeCompositeCrcResult(); |
| default: |
| throw new IOException("Unknown ChecksumCombineMode: " + combineMode); |
| } |
| } |
| |
| FileChecksum makeMd5CrcResult() { |
| //compute file MD5 |
| final MD5Hash fileMD5 = MD5Hash.digest(blockChecksumBuf.getData()); |
| switch (crcType) { |
| case CRC32: |
| return new MD5MD5CRC32GzipFileChecksum(bytesPerCRC, |
| crcPerBlock, fileMD5); |
| case CRC32C: |
| return new MD5MD5CRC32CastagnoliFileChecksum(bytesPerCRC, |
| crcPerBlock, fileMD5); |
| default: |
| // we will get here when crcType is "NULL". |
| return null; |
| } |
| } |
| |
| FileChecksum makeCompositeCrcResult() throws IOException { |
| long blockSizeHint = 0; |
| if (locatedBlocks.size() > 0) { |
| blockSizeHint = locatedBlocks.get(0).getBlockSize(); |
| } |
| CrcComposer crcComposer = |
| CrcComposer.newCrcComposer(getCrcType(), blockSizeHint); |
| byte[] blockChecksumBytes = blockChecksumBuf.getData(); |
| |
| long sumBlockLengths = 0; |
| for (int i = 0; i < locatedBlocks.size() - 1; ++i) { |
| LocatedBlock block = locatedBlocks.get(i); |
| // For everything except the last LocatedBlock, we expect getBlockSize() |
| // to accurately reflect the number of file bytes digested in the block |
| // checksum. |
| sumBlockLengths += block.getBlockSize(); |
| int blockCrc = CrcUtil.readInt(blockChecksumBytes, i * 4); |
| |
| crcComposer.update(blockCrc, block.getBlockSize()); |
| LOG.debug( |
| "Added blockCrc 0x{} for block index {} of size {}", |
| Integer.toString(blockCrc, 16), i, block.getBlockSize()); |
| } |
| |
| // NB: In some cases the located blocks have their block size adjusted |
| // explicitly based on the requested length, but not all cases; |
| // these numbers may or may not reflect actual sizes on disk. |
| long reportedLastBlockSize = |
| blockLocations.getLastLocatedBlock().getBlockSize(); |
| long consumedLastBlockLength = reportedLastBlockSize; |
| if (length - sumBlockLengths < reportedLastBlockSize) { |
| LOG.warn( |
| "Last block length {} is less than reportedLastBlockSize {}", |
| length - sumBlockLengths, reportedLastBlockSize); |
| consumedLastBlockLength = length - sumBlockLengths; |
| } |
| // NB: blockChecksumBytes.length may be much longer than actual bytes |
| // written into the DataOutput. |
| int lastBlockCrc = CrcUtil.readInt( |
| blockChecksumBytes, 4 * (locatedBlocks.size() - 1)); |
| crcComposer.update(lastBlockCrc, consumedLastBlockLength); |
| LOG.debug( |
| "Added lastBlockCrc 0x{} for block index {} of size {}", |
| Integer.toString(lastBlockCrc, 16), |
| locatedBlocks.size() - 1, |
| consumedLastBlockLength); |
| |
| int compositeCrc = CrcUtil.readInt(crcComposer.digest(), 0); |
| return new CompositeCrcFileChecksum( |
| compositeCrc, getCrcType(), bytesPerCRC); |
| } |
| |
| /** |
| * Create and return a sender given an IO stream pair. |
| */ |
| Sender createSender(IOStreamPair pair) { |
| DataOutputStream out = (DataOutputStream) pair.out; |
| return new Sender(out); |
| } |
| |
| /** |
| * Close an IO stream pair. |
| */ |
| void close(IOStreamPair pair) { |
| if (pair != null) { |
| IOUtils.closeStream(pair.in); |
| IOUtils.closeStream(pair.out); |
| } |
| } |
| |
| /** |
| * Parses out various checksum properties like bytesPerCrc, crcPerBlock, |
| * and crcType from {@code checksumData} and either stores them as the |
| * authoritative value or compares them to a previously extracted value |
| * to check comppatibility. |
| * |
| * @param checksumData response from the datanode |
| * @param locatedBlock the block corresponding to the response |
| * @param datanode the datanode which produced the response |
| * @param blockIdx the block or block-group index of the response |
| */ |
| void extractChecksumProperties( |
| OpBlockChecksumResponseProto checksumData, |
| LocatedBlock locatedBlock, |
| DatanodeInfo datanode, |
| int blockIdx) |
| throws IOException { |
| //read byte-per-checksum |
| final int bpc = checksumData.getBytesPerCrc(); |
| if (blockIdx == 0) { //first block |
| setBytesPerCRC(bpc); |
| } else if (bpc != getBytesPerCRC()) { |
| if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { |
| LOG.warn( |
| "Current bytesPerCRC={} doesn't match next bpc={}, but " |
| + "continuing anyway because we're using COMPOSITE_CRC. " |
| + "If trying to preserve CHECKSUMTYPE, only the current " |
| + "bytesPerCRC will be preserved.", getBytesPerCRC(), bpc); |
| } else { |
| throw new IOException("Byte-per-checksum not matched: bpc=" + bpc |
| + " but bytesPerCRC=" + getBytesPerCRC()); |
| } |
| } |
| |
| //read crc-per-block |
| final long cpb = checksumData.getCrcPerBlock(); |
| if (getLocatedBlocks().size() > 1 && blockIdx == 0) { |
| setCrcPerBlock(cpb); |
| } |
| |
| // read crc-type |
| final DataChecksum.Type ct; |
| if (checksumData.hasCrcType()) { |
| ct = PBHelperClient.convert(checksumData.getCrcType()); |
| } else { |
| LOG.debug("Retrieving checksum from an earlier-version DataNode: " + |
| "inferring checksum by reading first byte"); |
| ct = getClient().inferChecksumTypeByReading(locatedBlock, datanode); |
| } |
| |
| if (blockIdx == 0) { |
| setCrcType(ct); |
| } else if (getCrcType() != DataChecksum.Type.MIXED && |
| getCrcType() != ct) { |
| if (getBlockChecksumType() == BlockChecksumType.COMPOSITE_CRC) { |
| throw new IOException( |
| "DataChecksum.Type.MIXED is not supported for COMPOSITE_CRC"); |
| } else { |
| // if crc types are mixed in a file |
| setCrcType(DataChecksum.Type.MIXED); |
| } |
| } |
| |
| if (blockIdx == 0) { |
| LOG.debug("set bytesPerCRC={}, crcPerBlock={}", |
| getBytesPerCRC(), getCrcPerBlock()); |
| } |
| } |
| |
| /** |
| * Parses out the raw blockChecksum bytes from {@code checksumData} |
| * according to the blockChecksumType and populates the cumulative |
| * blockChecksumBuf with it. |
| * |
| * @return a debug-string representation of the parsed checksum if |
| * debug is enabled, otherwise null. |
| */ |
| String populateBlockChecksumBuf(OpBlockChecksumResponseProto checksumData) |
| throws IOException { |
| String blockChecksumForDebug = null; |
| switch (getBlockChecksumType()) { |
| case MD5CRC: |
| //read md5 |
| final MD5Hash md5 = new MD5Hash( |
| checksumData.getBlockChecksum().toByteArray()); |
| md5.write(getBlockChecksumBuf()); |
| if (LOG.isDebugEnabled()) { |
| blockChecksumForDebug = md5.toString(); |
| } |
| break; |
| case COMPOSITE_CRC: |
| BlockChecksumType returnedType = PBHelperClient.convert( |
| checksumData.getBlockChecksumOptions().getBlockChecksumType()); |
| if (returnedType != BlockChecksumType.COMPOSITE_CRC) { |
| throw new IOException(String.format( |
| "Unexpected blockChecksumType '%s', expecting COMPOSITE_CRC", |
| returnedType)); |
| } |
| byte[] crcBytes = checksumData.getBlockChecksum().toByteArray(); |
| if (LOG.isDebugEnabled()) { |
| blockChecksumForDebug = CrcUtil.toSingleCrcString(crcBytes); |
| } |
| getBlockChecksumBuf().write(crcBytes); |
| break; |
| default: |
| throw new IOException( |
| "Unknown BlockChecksumType: " + getBlockChecksumType()); |
| } |
| return blockChecksumForDebug; |
| } |
| } |
| |
| /** |
| * Replicated file checksum computer. |
| */ |
| static class ReplicatedFileChecksumComputer extends FileChecksumComputer { |
| private int blockIdx; |
| |
| ReplicatedFileChecksumComputer(String src, long length, |
| LocatedBlocks blockLocations, |
| ClientProtocol namenode, |
| DFSClient client, |
| ChecksumCombineMode combineMode) |
| throws IOException { |
| super(src, length, blockLocations, namenode, client, combineMode); |
| } |
| |
| @Override |
| void checksumBlocks() throws IOException { |
| // get block checksum for each block |
| for (blockIdx = 0; |
| blockIdx < getLocatedBlocks().size() && getRemaining() >= 0; |
| blockIdx++) { |
| if (isRefetchBlocks()) { // refetch to get fresh tokens |
| refetchBlocks(); |
| } |
| |
| LocatedBlock locatedBlock = getLocatedBlocks().get(blockIdx); |
| |
| if (!checksumBlock(locatedBlock)) { |
| throw new PathIOException( |
| getSrc(), "Fail to get block MD5 for " + locatedBlock); |
| } |
| } |
| } |
| |
| /** |
| * Return true when sounds good to continue or retry, false when severe |
| * condition or totally failed. |
| */ |
| private boolean checksumBlock(LocatedBlock locatedBlock) { |
| ExtendedBlock block = locatedBlock.getBlock(); |
| if (getRemaining() < block.getNumBytes()) { |
| block.setNumBytes(getRemaining()); |
| } |
| setRemaining(getRemaining() - block.getNumBytes()); |
| |
| DatanodeInfo[] datanodes = locatedBlock.getLocations(); |
| |
| int tmpTimeout = 3000 * datanodes.length + |
| getClient().getConf().getSocketTimeout(); |
| setTimeout(tmpTimeout); |
| |
| //try each datanode location of the block |
| boolean done = false; |
| for (int j = 0; !done && j < datanodes.length; j++) { |
| try { |
| tryDatanode(locatedBlock, datanodes[j]); |
| done = true; |
| } catch (InvalidBlockTokenException ibte) { |
| if (blockIdx > getLastRetriedIndex()) { |
| LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " |
| + "for file {} for block {} from datanode {}. Will retry " |
| + "the block once.", |
| getSrc(), block, datanodes[j]); |
| setLastRetriedIndex(blockIdx); |
| done = true; // actually it's not done; but we'll retry |
| blockIdx--; // repeat at blockIdx-th block |
| setRefetchBlocks(true); |
| } |
| } catch (InvalidEncryptionKeyException iee) { |
| if (blockIdx > getLastRetriedIndex()) { |
| LOG.debug("Got invalid encryption key error in response to " |
| + "OP_BLOCK_CHECKSUM for file {} for block {} from " |
| + "datanode {}. Will retry " + "the block once.", |
| getSrc(), block, datanodes[j]); |
| setLastRetriedIndex(blockIdx); |
| done = true; // actually it's not done; but we'll retry |
| blockIdx--; // repeat at i-th block |
| getClient().clearDataEncryptionKey(); |
| } |
| } catch (IOException ie) { |
| LOG.warn("src={}" + ", datanodes[{}]={}", |
| getSrc(), j, datanodes[j], ie); |
| } |
| } |
| |
| return done; |
| } |
| |
| /** |
| * Try one replica or datanode to compute the block checksum given a block. |
| */ |
| private void tryDatanode(LocatedBlock locatedBlock, |
| DatanodeInfo datanode) throws IOException { |
| |
| ExtendedBlock block = locatedBlock.getBlock(); |
| |
| try (IOStreamPair pair = getClient().connectToDN(datanode, getTimeout(), |
| locatedBlock.getBlockToken())) { |
| |
| LOG.debug("write to {}: {}, block={}", datanode, |
| Op.BLOCK_CHECKSUM, block); |
| |
| // get block checksum |
| createSender(pair).blockChecksum( |
| block, |
| locatedBlock.getBlockToken(), |
| new BlockChecksumOptions(getBlockChecksumType())); |
| |
| final BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( |
| PBHelperClient.vintPrefixed(pair.in)); |
| |
| String logInfo = "for block " + block + " from datanode " + |
| datanode; |
| DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); |
| |
| OpBlockChecksumResponseProto checksumData = |
| reply.getChecksumResponse(); |
| extractChecksumProperties( |
| checksumData, locatedBlock, datanode, blockIdx); |
| String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); |
| LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", |
| datanode, blockChecksumForDebug, getBlockChecksumType()); |
| } |
| } |
| } |
| |
| /** |
| * Non-striped checksum computing for striped files. |
| */ |
| static class StripedFileNonStripedChecksumComputer |
| extends FileChecksumComputer { |
| private final ErasureCodingPolicy ecPolicy; |
| private int bgIdx; |
| |
| StripedFileNonStripedChecksumComputer(String src, long length, |
| LocatedBlocks blockLocations, |
| ClientProtocol namenode, |
| DFSClient client, |
| ErasureCodingPolicy ecPolicy, |
| ChecksumCombineMode combineMode) |
| throws IOException { |
| super(src, length, blockLocations, namenode, client, combineMode); |
| |
| this.ecPolicy = ecPolicy; |
| } |
| |
| @Override |
| void checksumBlocks() throws IOException { |
| int tmpTimeout = 3000 * 1 + getClient().getConf().getSocketTimeout(); |
| setTimeout(tmpTimeout); |
| |
| for (bgIdx = 0; |
| bgIdx < getLocatedBlocks().size() && getRemaining() >= 0; bgIdx++) { |
| if (isRefetchBlocks()) { // refetch to get fresh tokens |
| refetchBlocks(); |
| } |
| |
| LocatedBlock locatedBlock = getLocatedBlocks().get(bgIdx); |
| LocatedStripedBlock blockGroup = (LocatedStripedBlock) locatedBlock; |
| |
| if (!checksumBlockGroup(blockGroup)) { |
| throw new PathIOException( |
| getSrc(), "Fail to get block checksum for " + locatedBlock); |
| } |
| } |
| } |
| |
| |
| private boolean checksumBlockGroup( |
| LocatedStripedBlock blockGroup) throws IOException { |
| ExtendedBlock block = blockGroup.getBlock(); |
| long requestedNumBytes = block.getNumBytes(); |
| if (getRemaining() < block.getNumBytes()) { |
| requestedNumBytes = getRemaining(); |
| } |
| setRemaining(getRemaining() - requestedNumBytes); |
| |
| StripedBlockInfo stripedBlockInfo = new StripedBlockInfo(block, |
| blockGroup.getLocations(), blockGroup.getBlockTokens(), |
| blockGroup.getBlockIndices(), ecPolicy); |
| DatanodeInfo[] datanodes = blockGroup.getLocations(); |
| |
| //try each datanode in the block group. |
| boolean done = false; |
| for (int j = 0; !done && j < datanodes.length; j++) { |
| try { |
| tryDatanode(blockGroup, stripedBlockInfo, datanodes[j], |
| requestedNumBytes); |
| done = true; |
| } catch (InvalidBlockTokenException ibte) { |
| if (bgIdx > getLastRetriedIndex()) { |
| LOG.debug("Got access token error in response to OP_BLOCK_CHECKSUM " |
| + "for file {} for block {} from datanode {}. Will retry " |
| + "the block once.", |
| getSrc(), block, datanodes[j]); |
| setLastRetriedIndex(bgIdx); |
| done = true; // actually it's not done; but we'll retry |
| bgIdx--; // repeat at bgIdx-th block |
| setRefetchBlocks(true); |
| } |
| } catch (IOException ie) { |
| LOG.warn("src={}" + ", datanodes[{}]={}", |
| getSrc(), j, datanodes[j], ie); |
| } |
| } |
| |
| return done; |
| } |
| |
| /** |
| * Return true when sounds good to continue or retry, false when severe |
| * condition or totally failed. |
| */ |
| private void tryDatanode(LocatedStripedBlock blockGroup, |
| StripedBlockInfo stripedBlockInfo, |
| DatanodeInfo datanode, |
| long requestedNumBytes) throws IOException { |
| try (IOStreamPair pair = getClient().connectToDN(datanode, |
| getTimeout(), blockGroup.getBlockToken())) { |
| |
| LOG.debug("write to {}: {}, blockGroup={}", |
| datanode, Op.BLOCK_GROUP_CHECKSUM, blockGroup); |
| |
| // get block group checksum |
| createSender(pair).blockGroupChecksum( |
| stripedBlockInfo, |
| blockGroup.getBlockToken(), |
| requestedNumBytes, |
| new BlockChecksumOptions(getBlockChecksumType())); |
| |
| BlockOpResponseProto reply = BlockOpResponseProto.parseFrom( |
| PBHelperClient.vintPrefixed(pair.in)); |
| |
| String logInfo = "for blockGroup " + blockGroup + |
| " from datanode " + datanode; |
| DataTransferProtoUtil.checkBlockOpStatus(reply, logInfo); |
| |
| OpBlockChecksumResponseProto checksumData = reply.getChecksumResponse(); |
| extractChecksumProperties(checksumData, blockGroup, datanode, bgIdx); |
| String blockChecksumForDebug = populateBlockChecksumBuf(checksumData); |
| LOG.debug("got reply from {}: blockChecksum={}, blockChecksumType={}", |
| datanode, blockChecksumForDebug, getBlockChecksumType()); |
| } |
| } |
| } |
| } |