| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hdfs.server.datanode; |
| |
| import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR; |
| import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN; |
| import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; |
| import static org.apache.hadoop.util.Time.now; |
| import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.io.OutputStream; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketException; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.Arrays; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.hadoop.hdfs.protocol.DatanodeInfo; |
| import org.apache.hadoop.hdfs.protocol.ExtendedBlock; |
| import org.apache.hadoop.hdfs.protocol.HdfsConstants; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Op; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; |
| import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; |
| import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; |
| import org.apache.hadoop.hdfs.protocolPB.PBHelper; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; |
| import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; |
| import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; |
| import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; |
| import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.MD5Hash; |
| import org.apache.hadoop.net.NetUtils; |
| import org.apache.hadoop.net.SocketInputWrapper; |
| import org.apache.hadoop.security.token.SecretManager.InvalidToken; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.DataChecksum; |
| |
| import com.google.protobuf.ByteString; |
| |
| |
| /** |
| * Thread for processing incoming/outgoing data stream. |
| */ |
| class DataXceiver extends Receiver implements Runnable { |
| public static final Log LOG = DataNode.LOG; |
| static final Log ClientTraceLog = DataNode.ClientTraceLog; |
| |
| private final Socket s; |
| private final boolean isLocal; //is a local connection? |
| private final String remoteAddress; // address of remote side |
| private final String localAddress; // local address of this daemon |
| private final DataNode datanode; |
| private final DNConf dnConf; |
| private final DataXceiverServer dataXceiverServer; |
| private final boolean connectToDnViaHostname; |
| private long opStartTime; //the start time of receiving an Op |
| private final SocketInputWrapper socketIn; |
| private OutputStream socketOut; |
| |
| /** |
| * Client Name used in previous operation. Not available on first request |
| * on the socket. |
| */ |
| private String previousOpClientName; |
| |
| public static DataXceiver create(Socket s, DataNode dn, |
| DataXceiverServer dataXceiverServer) throws IOException { |
| return new DataXceiver(s, dn, dataXceiverServer); |
| } |
| |
| private DataXceiver(Socket s, |
| DataNode datanode, |
| DataXceiverServer dataXceiverServer) throws IOException { |
| |
| this.s = s; |
| this.dnConf = datanode.getDnConf(); |
| this.socketIn = NetUtils.getInputStream(s); |
| this.socketOut = NetUtils.getOutputStream(s, dnConf.socketWriteTimeout); |
| this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); |
| this.datanode = datanode; |
| this.dataXceiverServer = dataXceiverServer; |
| this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname; |
| remoteAddress = s.getRemoteSocketAddress().toString(); |
| localAddress = s.getLocalSocketAddress().toString(); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Number of active connections is: " |
| + datanode.getXceiverCount()); |
| } |
| } |
| |
| /** |
| * Update the current thread's name to contain the current status. |
| * Use this only after this receiver has started on its thread, i.e., |
| * outside the constructor. |
| */ |
| private void updateCurrentThreadName(String status) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("DataXceiver for client "); |
| if (previousOpClientName != null) { |
| sb.append(previousOpClientName).append(" at "); |
| } |
| sb.append(remoteAddress); |
| if (status != null) { |
| sb.append(" [").append(status).append("]"); |
| } |
| Thread.currentThread().setName(sb.toString()); |
| } |
| |
| /** Return the datanode object. */ |
| DataNode getDataNode() {return datanode;} |
| |
| private OutputStream getOutputStream() { |
| return socketOut; |
| } |
| |
| /** |
| * Read/write data from/to the DataXceiverServer. |
| */ |
| @Override |
| public void run() { |
| int opsProcessed = 0; |
| Op op = null; |
| |
| dataXceiverServer.childSockets.add(s); |
| |
| try { |
| |
| InputStream input = socketIn; |
| if (dnConf.encryptDataTransfer) { |
| IOStreamPair encryptedStreams = null; |
| try { |
| encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, |
| socketIn, datanode.blockPoolTokenSecretManager, |
| dnConf.encryptionAlgorithm); |
| } catch (InvalidMagicNumberException imne) { |
| LOG.info("Failed to read expected encryption handshake from client " + |
| "at " + s.getInetAddress() + ". Perhaps the client is running an " + |
| "older version of Hadoop which does not support encryption"); |
| return; |
| } |
| input = encryptedStreams.in; |
| socketOut = encryptedStreams.out; |
| } |
| input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE); |
| |
| super.initialize(new DataInputStream(input)); |
| |
| // We process requests in a loop, and stay around for a short timeout. |
| // This optimistic behaviour allows the other end to reuse connections. |
| // Setting keepalive timeout to 0 disable this behavior. |
| do { |
| updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1)); |
| |
| try { |
| if (opsProcessed != 0) { |
| assert dnConf.socketKeepaliveTimeout > 0; |
| socketIn.setTimeout(dnConf.socketKeepaliveTimeout); |
| } else { |
| socketIn.setTimeout(dnConf.socketTimeout); |
| } |
| op = readOp(); |
| } catch (InterruptedIOException ignored) { |
| // Time out while we wait for client rpc |
| break; |
| } catch (IOException err) { |
| // Since we optimistically expect the next op, it's quite normal to get EOF here. |
| if (opsProcessed > 0 && |
| (err instanceof EOFException || err instanceof ClosedChannelException)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Cached " + s.toString() + " closing after " + opsProcessed + " ops"); |
| } |
| } else { |
| throw err; |
| } |
| break; |
| } |
| |
| // restore normal timeout |
| if (opsProcessed != 0) { |
| s.setSoTimeout(dnConf.socketTimeout); |
| } |
| |
| opStartTime = now(); |
| processOp(op); |
| ++opsProcessed; |
| } while (!s.isClosed() && dnConf.socketKeepaliveTimeout > 0); |
| } catch (Throwable t) { |
| LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " + |
| ((op == null) ? "unknown" : op.name()) + " operation " + |
| " src: " + remoteAddress + |
| " dest: " + localAddress, t); |
| } finally { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug(datanode.getDisplayName() + ":Number of active connections is: " |
| + datanode.getXceiverCount()); |
| } |
| updateCurrentThreadName("Cleaning up"); |
| IOUtils.closeStream(in); |
| IOUtils.closeSocket(s); |
| dataXceiverServer.childSockets.remove(s); |
| } |
| } |
| |
| @Override |
| public void readBlock(final ExtendedBlock block, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientName, |
| final long blockOffset, |
| final long length, |
| final boolean sendChecksum) throws IOException { |
| previousOpClientName = clientName; |
| |
| OutputStream baseStream = getOutputStream(); |
| DataOutputStream out = new DataOutputStream(new BufferedOutputStream( |
| baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); |
| checkAccess(out, true, block, blockToken, |
| Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ); |
| |
| // send the block |
| BlockSender blockSender = null; |
| DatanodeRegistration dnR = |
| datanode.getDNRegistrationForBP(block.getBlockPoolId()); |
| final String clientTraceFmt = |
| clientName.length() > 0 && ClientTraceLog.isInfoEnabled() |
| ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress, |
| "%d", "HDFS_READ", clientName, "%d", |
| dnR.getStorageID(), block, "%d") |
| : dnR + " Served block " + block + " to " + |
| remoteAddress; |
| |
| updateCurrentThreadName("Sending block " + block); |
| try { |
| try { |
| blockSender = new BlockSender(block, blockOffset, length, |
| true, false, sendChecksum, datanode, clientTraceFmt); |
| } catch(IOException e) { |
| String msg = "opReadBlock " + block + " received exception " + e; |
| LOG.info(msg); |
| sendResponse(ERROR, msg); |
| throw e; |
| } |
| |
| // send op status |
| writeSuccessWithChecksumInfo(blockSender, new DataOutputStream(getOutputStream())); |
| |
| long read = blockSender.sendBlock(out, baseStream, null); // send data |
| |
| if (blockSender.didSendEntireByteRange()) { |
| // If we sent the entire range, then we should expect the client |
| // to respond with a Status enum. |
| try { |
| ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( |
| PBHelper.vintPrefixed(in)); |
| if (!stat.hasStatus()) { |
| LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + |
| "code after reading. Will close connection."); |
| IOUtils.closeStream(out); |
| } |
| } catch (IOException ioe) { |
| LOG.debug("Error reading client status response. Will close connection.", ioe); |
| IOUtils.closeStream(out); |
| } |
| } else { |
| IOUtils.closeStream(out); |
| } |
| datanode.metrics.incrBytesRead((int) read); |
| datanode.metrics.incrBlocksRead(); |
| } catch ( SocketException ignored ) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace(dnR + ":Ignoring exception while serving " + block + " to " + |
| remoteAddress, ignored); |
| } |
| // Its ok for remote side to close the connection anytime. |
| datanode.metrics.incrBlocksRead(); |
| IOUtils.closeStream(out); |
| } catch ( IOException ioe ) { |
| /* What exactly should we do here? |
| * Earlier version shutdown() datanode if there is disk error. |
| */ |
| LOG.warn(dnR + ":Got exception while serving " + block + " to " |
| + remoteAddress, ioe); |
| throw ioe; |
| } finally { |
| IOUtils.closeStream(blockSender); |
| } |
| |
| //update metrics |
| datanode.metrics.addReadBlockOp(elapsed()); |
| datanode.metrics.incrReadsFromClient(isLocal); |
| } |
| |
| @Override |
| public void writeBlock(final ExtendedBlock block, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientname, |
| final DatanodeInfo[] targets, |
| final DatanodeInfo srcDataNode, |
| final BlockConstructionStage stage, |
| final int pipelineSize, |
| final long minBytesRcvd, |
| final long maxBytesRcvd, |
| final long latestGenerationStamp, |
| DataChecksum requestedChecksum) throws IOException { |
| previousOpClientName = clientname; |
| updateCurrentThreadName("Receiving block " + block); |
| final boolean isDatanode = clientname.length() == 0; |
| final boolean isClient = !isDatanode; |
| final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW |
| || stage == BlockConstructionStage.TRANSFER_FINALIZED; |
| |
| // check single target for transfer-RBW/Finalized |
| if (isTransfer && targets.length > 0) { |
| throw new IOException(stage + " does not support multiple targets " |
| + Arrays.asList(targets)); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname |
| + "\n block =" + block + ", newGs=" + latestGenerationStamp |
| + ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]" |
| + "\n targets=" + Arrays.asList(targets) |
| + "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode |
| ); |
| LOG.debug("isDatanode=" + isDatanode |
| + ", isClient=" + isClient |
| + ", isTransfer=" + isTransfer); |
| LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() + |
| " tcp no delay " + s.getTcpNoDelay()); |
| } |
| |
| // We later mutate block's generation stamp and length, but we need to |
| // forward the original version of the block to downstream mirrors, so |
| // make a copy here. |
| final ExtendedBlock originalBlock = new ExtendedBlock(block); |
| block.setNumBytes(dataXceiverServer.estimateBlockSize); |
| LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: " |
| + localAddress); |
| |
| // reply to upstream datanode or client |
| final DataOutputStream replyOut = new DataOutputStream( |
| new BufferedOutputStream( |
| getOutputStream(), |
| HdfsConstants.SMALL_BUFFER_SIZE)); |
| checkAccess(replyOut, isClient, block, blockToken, |
| Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE); |
| |
| DataOutputStream mirrorOut = null; // stream to next target |
| DataInputStream mirrorIn = null; // reply from next target |
| Socket mirrorSock = null; // socket to next target |
| BlockReceiver blockReceiver = null; // responsible for data handling |
| String mirrorNode = null; // the name:port of next target |
| String firstBadLink = ""; // first datanode that failed in connection setup |
| Status mirrorInStatus = SUCCESS; |
| try { |
| if (isDatanode || |
| stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { |
| // open a block receiver |
| blockReceiver = new BlockReceiver(block, in, |
| s.getRemoteSocketAddress().toString(), |
| s.getLocalSocketAddress().toString(), |
| stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, |
| clientname, srcDataNode, datanode, requestedChecksum); |
| } else { |
| datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); |
| } |
| |
| // |
| // Connect to downstream machine, if appropriate |
| // |
| if (targets.length > 0) { |
| InetSocketAddress mirrorTarget = null; |
| // Connect to backup machine |
| mirrorNode = targets[0].getXferAddr(connectToDnViaHostname); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Connecting to datanode " + mirrorNode); |
| } |
| mirrorTarget = NetUtils.createSocketAddr(mirrorNode); |
| mirrorSock = datanode.newSocket(); |
| try { |
| int timeoutValue = dnConf.socketTimeout |
| + (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length); |
| int writeTimeout = dnConf.socketWriteTimeout + |
| (HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length); |
| NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); |
| mirrorSock.setSoTimeout(timeoutValue); |
| mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); |
| |
| OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, |
| writeTimeout); |
| InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); |
| if (dnConf.encryptDataTransfer) { |
| IOStreamPair encryptedStreams = |
| DataTransferEncryptor.getEncryptedStreams( |
| unbufMirrorOut, unbufMirrorIn, |
| datanode.blockPoolTokenSecretManager |
| .generateDataEncryptionKey(block.getBlockPoolId())); |
| |
| unbufMirrorOut = encryptedStreams.out; |
| unbufMirrorIn = encryptedStreams.in; |
| } |
| mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, |
| HdfsConstants.SMALL_BUFFER_SIZE)); |
| mirrorIn = new DataInputStream(unbufMirrorIn); |
| |
| new Sender(mirrorOut).writeBlock(originalBlock, blockToken, |
| clientname, targets, srcDataNode, stage, pipelineSize, |
| minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum); |
| |
| mirrorOut.flush(); |
| |
| // read connect ack (only for clients, not for replication req) |
| if (isClient) { |
| BlockOpResponseProto connectAck = |
| BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); |
| mirrorInStatus = connectAck.getStatus(); |
| firstBadLink = connectAck.getFirstBadLink(); |
| if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { |
| LOG.info("Datanode " + targets.length + |
| " got response for connect ack " + |
| " from downstream datanode with firstbadlink as " + |
| firstBadLink); |
| } |
| } |
| |
| } catch (IOException e) { |
| if (isClient) { |
| BlockOpResponseProto.newBuilder() |
| .setStatus(ERROR) |
| // NB: Unconditionally using the xfer addr w/o hostname |
| .setFirstBadLink(targets[0].getXferAddr()) |
| .build() |
| .writeDelimitedTo(replyOut); |
| replyOut.flush(); |
| } |
| IOUtils.closeStream(mirrorOut); |
| mirrorOut = null; |
| IOUtils.closeStream(mirrorIn); |
| mirrorIn = null; |
| IOUtils.closeSocket(mirrorSock); |
| mirrorSock = null; |
| if (isClient) { |
| LOG.error(datanode + ":Exception transfering block " + |
| block + " to mirror " + mirrorNode + ": " + e); |
| throw e; |
| } else { |
| LOG.info(datanode + ":Exception transfering " + |
| block + " to mirror " + mirrorNode + |
| "- continuing without the mirror", e); |
| } |
| } |
| } |
| |
| // send connect-ack to source for clients and not transfer-RBW/Finalized |
| if (isClient && !isTransfer) { |
| if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { |
| LOG.info("Datanode " + targets.length + |
| " forwarding connect ack to upstream firstbadlink is " + |
| firstBadLink); |
| } |
| BlockOpResponseProto.newBuilder() |
| .setStatus(mirrorInStatus) |
| .setFirstBadLink(firstBadLink) |
| .build() |
| .writeDelimitedTo(replyOut); |
| replyOut.flush(); |
| } |
| |
| // receive the block and mirror to the next target |
| if (blockReceiver != null) { |
| String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; |
| blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, |
| mirrorAddr, null, targets); |
| |
| // send close-ack for transfer-RBW/Finalized |
| if (isTransfer) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("TRANSFER: send close-ack"); |
| } |
| writeResponse(SUCCESS, null, replyOut); |
| } |
| } |
| |
| // update its generation stamp |
| if (isClient && |
| stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { |
| block.setGenerationStamp(latestGenerationStamp); |
| block.setNumBytes(minBytesRcvd); |
| } |
| |
| // if this write is for a replication request or recovering |
| // a failed close for client, then confirm block. For other client-writes, |
| // the block is finalized in the PacketResponder. |
| if (isDatanode || |
| stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { |
| datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT); |
| LOG.info("Received " + block + " src: " + remoteAddress + " dest: " |
| + localAddress + " of size " + block.getNumBytes()); |
| } |
| |
| |
| } catch (IOException ioe) { |
| LOG.info("opWriteBlock " + block + " received exception " + ioe); |
| throw ioe; |
| } finally { |
| // close all opened streams |
| IOUtils.closeStream(mirrorOut); |
| IOUtils.closeStream(mirrorIn); |
| IOUtils.closeStream(replyOut); |
| IOUtils.closeSocket(mirrorSock); |
| IOUtils.closeStream(blockReceiver); |
| } |
| |
| //update metrics |
| datanode.metrics.addWriteBlockOp(elapsed()); |
| datanode.metrics.incrWritesFromClient(isLocal); |
| } |
| |
| @Override |
| public void transferBlock(final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String clientName, |
| final DatanodeInfo[] targets) throws IOException { |
| checkAccess(null, true, blk, blockToken, |
| Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); |
| previousOpClientName = clientName; |
| updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk); |
| |
| final DataOutputStream out = new DataOutputStream( |
| getOutputStream()); |
| try { |
| datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); |
| writeResponse(Status.SUCCESS, null, out); |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| } |
| |
| @Override |
| public void blockChecksum(final ExtendedBlock block, |
| final Token<BlockTokenIdentifier> blockToken) throws IOException { |
| final DataOutputStream out = new DataOutputStream( |
| getOutputStream()); |
| checkAccess(out, true, block, blockToken, |
| Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); |
| updateCurrentThreadName("Reading metadata for block " + block); |
| final LengthInputStream metadataIn = |
| datanode.data.getMetaDataInputStream(block); |
| final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( |
| metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); |
| |
| updateCurrentThreadName("Getting checksum for block " + block); |
| try { |
| //read metadata file |
| final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); |
| final DataChecksum checksum = header.getChecksum(); |
| final int bytesPerCRC = checksum.getBytesPerChecksum(); |
| final long crcPerBlock = (metadataIn.getLength() |
| - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize(); |
| |
| //compute block checksum |
| final MD5Hash md5 = MD5Hash.digest(checksumIn); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC |
| + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); |
| } |
| |
| //write reply |
| BlockOpResponseProto.newBuilder() |
| .setStatus(SUCCESS) |
| .setChecksumResponse(OpBlockChecksumResponseProto.newBuilder() |
| .setBytesPerCrc(bytesPerCRC) |
| .setCrcPerBlock(crcPerBlock) |
| .setMd5(ByteString.copyFrom(md5.getDigest())) |
| .setCrcType(PBHelper.convert(checksum.getChecksumType())) |
| ) |
| .build() |
| .writeDelimitedTo(out); |
| out.flush(); |
| } finally { |
| IOUtils.closeStream(out); |
| IOUtils.closeStream(checksumIn); |
| IOUtils.closeStream(metadataIn); |
| } |
| |
| //update metrics |
| datanode.metrics.addBlockChecksumOp(elapsed()); |
| } |
| |
| @Override |
| public void copyBlock(final ExtendedBlock block, |
| final Token<BlockTokenIdentifier> blockToken) throws IOException { |
| updateCurrentThreadName("Copying block " + block); |
| // Read in the header |
| if (datanode.isBlockTokenEnabled) { |
| try { |
| datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, |
| BlockTokenSecretManager.AccessMode.COPY); |
| } catch (InvalidToken e) { |
| LOG.warn("Invalid access token in request from " + remoteAddress |
| + " for OP_COPY_BLOCK for block " + block + " : " |
| + e.getLocalizedMessage()); |
| sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); |
| return; |
| } |
| |
| } |
| |
| if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start |
| String msg = "Not able to copy block " + block.getBlockId() + " to " |
| + s.getRemoteSocketAddress() + " because threads quota is exceeded."; |
| LOG.info(msg); |
| sendResponse(ERROR, msg); |
| return; |
| } |
| |
| BlockSender blockSender = null; |
| DataOutputStream reply = null; |
| boolean isOpSuccess = true; |
| |
| try { |
| // check if the block exists or not |
| blockSender = new BlockSender(block, 0, -1, false, false, true, datanode, |
| null); |
| |
| // set up response stream |
| OutputStream baseStream = getOutputStream(); |
| reply = new DataOutputStream(new BufferedOutputStream( |
| baseStream, HdfsConstants.SMALL_BUFFER_SIZE)); |
| |
| // send status first |
| writeSuccessWithChecksumInfo(blockSender, reply); |
| // send block content to the target |
| long read = blockSender.sendBlock(reply, baseStream, |
| dataXceiverServer.balanceThrottler); |
| |
| datanode.metrics.incrBytesRead((int) read); |
| datanode.metrics.incrBlocksRead(); |
| |
| LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress()); |
| } catch (IOException ioe) { |
| isOpSuccess = false; |
| LOG.info("opCopyBlock " + block + " received exception " + ioe); |
| throw ioe; |
| } finally { |
| dataXceiverServer.balanceThrottler.release(); |
| if (isOpSuccess) { |
| try { |
| // send one last byte to indicate that the resource is cleaned. |
| reply.writeChar('d'); |
| } catch (IOException ignored) { |
| } |
| } |
| IOUtils.closeStream(reply); |
| IOUtils.closeStream(blockSender); |
| } |
| |
| //update metrics |
| datanode.metrics.addCopyBlockOp(elapsed()); |
| } |
| |
| @Override |
| public void replaceBlock(final ExtendedBlock block, |
| final Token<BlockTokenIdentifier> blockToken, |
| final String delHint, |
| final DatanodeInfo proxySource) throws IOException { |
| updateCurrentThreadName("Replacing block " + block + " from " + delHint); |
| |
| /* read header */ |
| block.setNumBytes(dataXceiverServer.estimateBlockSize); |
| if (datanode.isBlockTokenEnabled) { |
| try { |
| datanode.blockPoolTokenSecretManager.checkAccess(blockToken, null, block, |
| BlockTokenSecretManager.AccessMode.REPLACE); |
| } catch (InvalidToken e) { |
| LOG.warn("Invalid access token in request from " + remoteAddress |
| + " for OP_REPLACE_BLOCK for block " + block + " : " |
| + e.getLocalizedMessage()); |
| sendResponse(ERROR_ACCESS_TOKEN, "Invalid access token"); |
| return; |
| } |
| } |
| |
| if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start |
| String msg = "Not able to receive block " + block.getBlockId() + " from " |
| + s.getRemoteSocketAddress() + " because threads quota is exceeded."; |
| LOG.warn(msg); |
| sendResponse(ERROR, msg); |
| return; |
| } |
| |
| Socket proxySock = null; |
| DataOutputStream proxyOut = null; |
| Status opStatus = SUCCESS; |
| String errMsg = null; |
| BlockReceiver blockReceiver = null; |
| DataInputStream proxyReply = null; |
| |
| try { |
| // get the output stream to the proxy |
| final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Connecting to datanode " + dnAddr); |
| } |
| InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr); |
| proxySock = datanode.newSocket(); |
| NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); |
| proxySock.setSoTimeout(dnConf.socketTimeout); |
| |
| OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, |
| dnConf.socketWriteTimeout); |
| InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); |
| if (dnConf.encryptDataTransfer) { |
| IOStreamPair encryptedStreams = |
| DataTransferEncryptor.getEncryptedStreams( |
| unbufProxyOut, unbufProxyIn, |
| datanode.blockPoolTokenSecretManager |
| .generateDataEncryptionKey(block.getBlockPoolId())); |
| unbufProxyOut = encryptedStreams.out; |
| unbufProxyIn = encryptedStreams.in; |
| } |
| |
| proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, |
| HdfsConstants.SMALL_BUFFER_SIZE)); |
| proxyReply = new DataInputStream(new BufferedInputStream(unbufProxyIn, |
| HdfsConstants.IO_FILE_BUFFER_SIZE)); |
| |
| /* send request to the proxy */ |
| new Sender(proxyOut).copyBlock(block, blockToken); |
| |
| // receive the response from the proxy |
| |
| BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( |
| PBHelper.vintPrefixed(proxyReply)); |
| |
| if (copyResponse.getStatus() != SUCCESS) { |
| if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) { |
| throw new IOException("Copy block " + block + " from " |
| + proxySock.getRemoteSocketAddress() |
| + " failed due to access token error"); |
| } |
| throw new IOException("Copy block " + block + " from " |
| + proxySock.getRemoteSocketAddress() + " failed"); |
| } |
| |
| // get checksum info about the block we're copying |
| ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo(); |
| DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( |
| checksumInfo.getChecksum()); |
| // open a block receiver and check if the block does not exist |
| blockReceiver = new BlockReceiver( |
| block, proxyReply, proxySock.getRemoteSocketAddress().toString(), |
| proxySock.getLocalSocketAddress().toString(), |
| null, 0, 0, 0, "", null, datanode, remoteChecksum); |
| |
| // receive a block |
| blockReceiver.receiveBlock(null, null, null, null, |
| dataXceiverServer.balanceThrottler, null); |
| |
| // notify name node |
| datanode.notifyNamenodeReceivedBlock(block, delHint); |
| |
| LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress()); |
| |
| } catch (IOException ioe) { |
| opStatus = ERROR; |
| errMsg = "opReplaceBlock " + block + " received exception " + ioe; |
| LOG.info(errMsg); |
| throw ioe; |
| } finally { |
| // receive the last byte that indicates the proxy released its thread resource |
| if (opStatus == SUCCESS) { |
| try { |
| proxyReply.readChar(); |
| } catch (IOException ignored) { |
| } |
| } |
| |
| // now release the thread resource |
| dataXceiverServer.balanceThrottler.release(); |
| |
| // send response back |
| try { |
| sendResponse(opStatus, errMsg); |
| } catch (IOException ioe) { |
| LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress()); |
| } |
| IOUtils.closeStream(proxyOut); |
| IOUtils.closeStream(blockReceiver); |
| IOUtils.closeStream(proxyReply); |
| } |
| |
| //update metrics |
| datanode.metrics.addReplaceBlockOp(elapsed()); |
| } |
| |
| private long elapsed() { |
| return now() - opStartTime; |
| } |
| |
| /** |
| * Utility function for sending a response. |
| * |
| * @param opStatus status message to write |
| * @param message message to send to the client or other DN |
| */ |
| private void sendResponse(Status status, |
| String message) throws IOException { |
| writeResponse(status, message, getOutputStream()); |
| } |
| |
| private static void writeResponse(Status status, String message, OutputStream out) |
| throws IOException { |
| BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() |
| .setStatus(status); |
| if (message != null) { |
| response.setMessage(message); |
| } |
| response.build().writeDelimitedTo(out); |
| out.flush(); |
| } |
| |
| private void writeSuccessWithChecksumInfo(BlockSender blockSender, |
| DataOutputStream out) throws IOException { |
| |
| ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder() |
| .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())) |
| .setChunkOffset(blockSender.getOffset()) |
| .build(); |
| |
| BlockOpResponseProto response = BlockOpResponseProto.newBuilder() |
| .setStatus(SUCCESS) |
| .setReadOpChecksumInfo(ckInfo) |
| .build(); |
| response.writeDelimitedTo(out); |
| out.flush(); |
| } |
| |
| |
| private void checkAccess(DataOutputStream out, final boolean reply, |
| final ExtendedBlock blk, |
| final Token<BlockTokenIdentifier> t, |
| final Op op, |
| final BlockTokenSecretManager.AccessMode mode) throws IOException { |
| if (datanode.isBlockTokenEnabled) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Checking block access token for block '" + blk.getBlockId() |
| + "' with mode '" + mode + "'"); |
| } |
| try { |
| datanode.blockPoolTokenSecretManager.checkAccess(t, null, blk, mode); |
| } catch(InvalidToken e) { |
| try { |
| if (reply) { |
| if (out == null) { |
| out = new DataOutputStream( |
| NetUtils.getOutputStream(s, dnConf.socketWriteTimeout)); |
| } |
| |
| BlockOpResponseProto.Builder resp = BlockOpResponseProto.newBuilder() |
| .setStatus(ERROR_ACCESS_TOKEN); |
| if (mode == BlockTokenSecretManager.AccessMode.WRITE) { |
| DatanodeRegistration dnR = |
| datanode.getDNRegistrationForBP(blk.getBlockPoolId()); |
| // NB: Unconditionally using the xfer addr w/o hostname |
| resp.setFirstBadLink(dnR.getXferAddr()); |
| } |
| resp.build().writeDelimitedTo(out); |
| out.flush(); |
| } |
| LOG.warn("Block token verification failed: op=" + op |
| + ", remoteAddress=" + remoteAddress |
| + ", message=" + e.getLocalizedMessage()); |
| throw e; |
| } finally { |
| IOUtils.closeStream(out); |
| } |
| } |
| } |
| } |
| } |