HDFS-2066. Create a package and individual class files for DataTransferProtocol.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134869 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index aa605d5..2979bea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -500,6 +500,9 @@
HDFS-2003. Separate FSEditLog reading logic from edit log memory state
building logic. (Ivan Kelly via todd)
+ HDFS-2066. Create a package and individual class files for
+ DataTransferProtocol. (szetszwo)
+
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 9b98e1b..67bd6f8 100644
--- a/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -30,10 +32,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -45,8 +46,6 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
/** This is a wrapper around connection to datanode
* and understands checksum, offset etc.
@@ -405,7 +404,7 @@
String clientName)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
- DataTransferProtocol.Sender.opReadBlock(
+ Sender.opReadBlock(
new DataOutputStream(new BufferedOutputStream(
NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
block, startOffset, len, clientName, blockToken);
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 76de6c2..f881718 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
@@ -63,18 +61,20 @@
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -133,7 +133,7 @@
SocketFactory socketFactory;
int socketTimeout;
final int writePacketSize;
- final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+ final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
final FileSystem.Statistics stats;
final int hdfsTimeout; // timeout value for a DFS operation.
final LeaseRenewer leaserenewer;
@@ -267,7 +267,7 @@
this.writePacketSize =
conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
- this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+ this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
// The hdfsTimeout is currently the same as the ipc timeout
this.hdfsTimeout = Client.getTimeout(conf);
@@ -1114,11 +1114,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("write to " + datanodes[j].getName() + ": "
- + BLOCK_CHECKSUM + ", block=" + block);
+ + Op.BLOCK_CHECKSUM + ", block=" + block);
}
// get block MD5
- DataTransferProtocol.Sender.opBlockChecksum(out, block,
- lb.getBlockToken());
+ Sender.opBlockChecksum(out, block, lb.getBlockToken());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 8843982..860cd9f 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
-
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -48,17 +46,18 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -68,7 +67,6 @@
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
@@ -849,7 +847,7 @@
DataNode.SMALL_BUFFER_SIZE));
//send the TRANSFER_BLOCK request
- DataTransferProtocol.Sender.opTransferBlock(out, block,
+ Sender.opTransferBlock(out, block,
dfsClient.clientName, targets, blockToken);
//ack
@@ -1023,7 +1021,7 @@
blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- DataTransferProtocol.Sender.opWriteBlock(out, block,
+ Sender.opWriteBlock(out, block,
nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS,
block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
accessToken);
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
deleted file mode 100644
index e756f95..0000000
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ /dev/null
@@ -1,716 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
-import org.apache.hadoop.security.token.Token;
-
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.fromProto;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.toProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
-import com.google.protobuf.Message;
-
-/**
- * Transfer data to/from datanode using a streaming protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface DataTransferProtocol {
- public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
-
- /** Version for data transfers between clients and datanodes
- * This should change when serialization of DatanodeInfo, not just
- * when protocol changes. It is not very obvious.
- */
- /*
- * Version 26:
- * Use protobuf.
- */
- public static final int DATA_TRANSFER_VERSION = 26;
-
- /** Operation */
- public enum Op {
- WRITE_BLOCK((byte)80),
- READ_BLOCK((byte)81),
- READ_METADATA((byte)82),
- REPLACE_BLOCK((byte)83),
- COPY_BLOCK((byte)84),
- BLOCK_CHECKSUM((byte)85),
- TRANSFER_BLOCK((byte)86);
-
- /** The code for this operation. */
- public final byte code;
-
- private Op(byte code) {
- this.code = code;
- }
-
- private static final int FIRST_CODE = values()[0].code;
- /** Return the object represented by the code. */
- private static Op valueOf(byte code) {
- final int i = (code & 0xff) - FIRST_CODE;
- return i < 0 || i >= values().length? null: values()[i];
- }
-
- /** Read from in */
- public static Op read(DataInput in) throws IOException {
- return valueOf(in.readByte());
- }
-
- /** Write to out */
- public void write(DataOutput out) throws IOException {
- out.write(code);
- }
- }
-
- public enum BlockConstructionStage {
- /** The enumerates are always listed as regular stage followed by the
- * recovery stage.
- * Changing this order will make getRecoveryStage not working.
- */
- // pipeline set up for block append
- PIPELINE_SETUP_APPEND,
- // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
- PIPELINE_SETUP_APPEND_RECOVERY,
- // data streaming
- DATA_STREAMING,
- // pipeline setup for failed data streaming recovery
- PIPELINE_SETUP_STREAMING_RECOVERY,
- // close the block and pipeline
- PIPELINE_CLOSE,
- // Recover a failed PIPELINE_CLOSE
- PIPELINE_CLOSE_RECOVERY,
- // pipeline set up for block creation
- PIPELINE_SETUP_CREATE,
- // transfer RBW for adding datanodes
- TRANSFER_RBW,
- // transfer Finalized for adding datanodes
- TRANSFER_FINALIZED;
-
- final static private byte RECOVERY_BIT = (byte)1;
-
- /**
- * get the recovery stage of this stage
- */
- public BlockConstructionStage getRecoveryStage() {
- if (this == PIPELINE_SETUP_CREATE) {
- throw new IllegalArgumentException( "Unexpected blockStage " + this);
- } else {
- return values()[ordinal()|RECOVERY_BIT];
- }
- }
- }
-
-
- /** Sender */
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public static class Sender {
- /** Initialize a operation. */
- private static void op(final DataOutput out, final Op op
- ) throws IOException {
- out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
- op.write(out);
- }
-
- private static void send(final DataOutputStream out, final Op opcode,
- final Message proto) throws IOException {
- op(out, opcode);
- proto.writeDelimitedTo(out);
- out.flush();
- }
-
- /** Send OP_READ_BLOCK */
- public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
- long blockOffset, long blockLen, String clientName,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
-
- OpReadBlockProto proto = OpReadBlockProto.newBuilder()
- .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
- .setOffset(blockOffset)
- .setLen(blockLen)
- .build();
-
- send(out, Op.READ_BLOCK, proto);
- }
-
-
- /** Send OP_WRITE_BLOCK */
- public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException {
- ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
- blockToken);
-
- OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
- .setHeader(header)
- .addAllTargets(
- toProtos(targets, 1))
- .setStage(toProto(stage))
- .setPipelineSize(pipelineSize)
- .setMinBytesRcvd(minBytesRcvd)
- .setMaxBytesRcvd(maxBytesRcvd)
- .setLatestGenerationStamp(newGs);
-
- if (src != null) {
- proto.setSource(toProto(src));
- }
-
- send(out, Op.WRITE_BLOCK, proto.build());
- }
-
- /** Send {@link Op#TRANSFER_BLOCK} */
- public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
-
- OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
- .setHeader(DataTransferProtoUtil.buildClientHeader(
- blk, client, blockToken))
- .addAllTargets(toProtos(targets, 0))
- .build();
-
- send(out, Op.TRANSFER_BLOCK, proto);
- }
-
- /** Send OP_REPLACE_BLOCK */
- public static void opReplaceBlock(DataOutputStream out,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException {
- OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
- .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
- .setDelHint(delHint)
- .setSource(toProto(src))
- .build();
-
- send(out, Op.REPLACE_BLOCK, proto);
- }
-
- /** Send OP_COPY_BLOCK */
- public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
- OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
- .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
- .build();
-
- send(out, Op.COPY_BLOCK, proto);
- }
-
- /** Send OP_BLOCK_CHECKSUM */
- public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException {
- OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
- .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
- .build();
-
- send(out, Op.BLOCK_CHECKSUM, proto);
- }
- }
-
- /** Receiver */
- public static abstract class Receiver {
- /** Read an Op. It also checks protocol version. */
- protected final Op readOp(DataInputStream in) throws IOException {
- final short version = in.readShort();
- if (version != DATA_TRANSFER_VERSION) {
- throw new IOException( "Version Mismatch (Expected: " +
- DataTransferProtocol.DATA_TRANSFER_VERSION +
- ", Received: " + version + " )");
- }
- return Op.read(in);
- }
-
- /** Process op by the corresponding method. */
- protected final void processOp(Op op, DataInputStream in
- ) throws IOException {
- switch(op) {
- case READ_BLOCK:
- opReadBlock(in);
- break;
- case WRITE_BLOCK:
- opWriteBlock(in);
- break;
- case REPLACE_BLOCK:
- opReplaceBlock(in);
- break;
- case COPY_BLOCK:
- opCopyBlock(in);
- break;
- case BLOCK_CHECKSUM:
- opBlockChecksum(in);
- break;
- case TRANSFER_BLOCK:
- opTransferBlock(in);
- break;
- default:
- throw new IOException("Unknown op " + op + " in data stream");
- }
- }
-
- /** Receive OP_READ_BLOCK */
- private void opReadBlock(DataInputStream in) throws IOException {
- OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-
- ExtendedBlock b = fromProto(
- proto.getHeader().getBaseHeader().getBlock());
- Token<BlockTokenIdentifier> token = fromProto(
- proto.getHeader().getBaseHeader().getToken());
-
- opReadBlock(in, b, proto.getOffset(), proto.getLen(),
- proto.getHeader().getClientName(), token);
- }
- /**
- * Abstract OP_READ_BLOCK method. Read a block.
- */
- protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
- long offset, long length, String client,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
-
- /** Receive OP_WRITE_BLOCK */
- private void opWriteBlock(DataInputStream in) throws IOException {
- final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
- opWriteBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
- proto.getPipelineSize(),
- fromProto(proto.getStage()),
- proto.getLatestGenerationStamp(),
- proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
- proto.getHeader().getClientName(),
- fromProto(proto.getSource()),
- fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
- }
-
- /**
- * Abstract OP_WRITE_BLOCK method.
- * Write a block.
- */
- protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
- int pipelineSize, BlockConstructionStage stage, long newGs,
- long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
- DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
- /** Receive {@link Op#TRANSFER_BLOCK} */
- private void opTransferBlock(DataInputStream in) throws IOException {
- final OpTransferBlockProto proto =
- OpTransferBlockProto.parseFrom(vintPrefixed(in));
-
- opTransferBlock(in,
- fromProto(proto.getHeader().getBaseHeader().getBlock()),
- proto.getHeader().getClientName(),
- fromProtos(proto.getTargetsList()),
- fromProto(proto.getHeader().getBaseHeader().getToken()));
- }
-
- /**
- * Abstract {@link Op#TRANSFER_BLOCK} method.
- * For {@link BlockConstructionStage#TRANSFER_RBW}
- * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
- */
- protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
- String client, DatanodeInfo[] targets,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
- /** Receive OP_REPLACE_BLOCK */
- private void opReplaceBlock(DataInputStream in) throws IOException {
- OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-
- opReplaceBlock(in,
- fromProto(proto.getHeader().getBlock()),
- proto.getDelHint(),
- fromProto(proto.getSource()),
- fromProto(proto.getHeader().getToken()));
- }
-
- /**
- * Abstract OP_REPLACE_BLOCK method.
- * It is used for balancing purpose; send to a destination
- */
- protected abstract void opReplaceBlock(DataInputStream in,
- ExtendedBlock blk, String delHint, DatanodeInfo src,
- Token<BlockTokenIdentifier> blockToken) throws IOException;
-
- /** Receive OP_COPY_BLOCK */
- private void opCopyBlock(DataInputStream in) throws IOException {
- OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-
- opCopyBlock(in,
- fromProto(proto.getHeader().getBlock()),
- fromProto(proto.getHeader().getToken()));
- }
-
- /**
- * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
- * a proxy source.
- */
- protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
- Token<BlockTokenIdentifier> blockToken)
- throws IOException;
-
- /** Receive OP_BLOCK_CHECKSUM */
- private void opBlockChecksum(DataInputStream in) throws IOException {
- OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
-
- opBlockChecksum(in,
- fromProto(proto.getHeader().getBlock()),
- fromProto(proto.getHeader().getToken()));
- }
-
- /**
- * Abstract OP_BLOCK_CHECKSUM method.
- * Get the checksum of a block
- */
- protected abstract void opBlockChecksum(DataInputStream in,
- ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
- throws IOException;
- }
-
- /** reply **/
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public static class PipelineAck {
- PipelineAckProto proto;
- public final static long UNKOWN_SEQNO = -2;
-
- /** default constructor **/
- public PipelineAck() {
- }
-
- /**
- * Constructor
- * @param seqno sequence number
- * @param replies an array of replies
- */
- public PipelineAck(long seqno, Status[] replies) {
- proto = PipelineAckProto.newBuilder()
- .setSeqno(seqno)
- .addAllStatus(Arrays.asList(replies))
- .build();
- }
-
- /**
- * Get the sequence number
- * @return the sequence number
- */
- public long getSeqno() {
- return proto.getSeqno();
- }
-
- /**
- * Get the number of replies
- * @return the number of replies
- */
- public short getNumOfReplies() {
- return (short)proto.getStatusCount();
- }
-
- /**
- * get the ith reply
- * @return the the ith reply
- */
- public Status getReply(int i) {
- return proto.getStatus(i);
- }
-
- /**
- * Check if this ack contains error status
- * @return true if all statuses are SUCCESS
- */
- public boolean isSuccess() {
- for (DataTransferProtos.Status reply : proto.getStatusList()) {
- if (reply != DataTransferProtos.Status.SUCCESS) {
- return false;
- }
- }
- return true;
- }
-
- /**** Writable interface ****/
- public void readFields(InputStream in) throws IOException {
- proto = PipelineAckProto.parseFrom(vintPrefixed(in));
- }
-
- public void write(OutputStream out) throws IOException {
- proto.writeDelimitedTo(out);
- }
-
- @Override //Object
- public String toString() {
- return proto.toString();
- }
- }
-
- /**
- * Header data for each packet that goes through the read/write pipelines.
- */
- public static class PacketHeader {
- /** Header size for a packet */
- private static final int PROTO_SIZE =
- PacketHeaderProto.newBuilder()
- .setOffsetInBlock(0)
- .setSeqno(0)
- .setLastPacketInBlock(false)
- .setDataLen(0)
- .build().getSerializedSize();
- public static final int PKT_HEADER_LEN =
- 6 + PROTO_SIZE;
-
- private int packetLen;
- private PacketHeaderProto proto;
-
- public PacketHeader() {
- }
-
- public PacketHeader(int packetLen, long offsetInBlock, long seqno,
- boolean lastPacketInBlock, int dataLen) {
- this.packetLen = packetLen;
- proto = PacketHeaderProto.newBuilder()
- .setOffsetInBlock(offsetInBlock)
- .setSeqno(seqno)
- .setLastPacketInBlock(lastPacketInBlock)
- .setDataLen(dataLen)
- .build();
- }
-
- public int getDataLen() {
- return proto.getDataLen();
- }
-
- public boolean isLastPacketInBlock() {
- return proto.getLastPacketInBlock();
- }
-
- public long getSeqno() {
- return proto.getSeqno();
- }
-
- public long getOffsetInBlock() {
- return proto.getOffsetInBlock();
- }
-
- public int getPacketLen() {
- return packetLen;
- }
-
- @Override
- public String toString() {
- return "PacketHeader with packetLen=" + packetLen +
- "Header data: " +
- proto.toString();
- }
-
- public void readFields(ByteBuffer buf) throws IOException {
- packetLen = buf.getInt();
- short protoLen = buf.getShort();
- byte[] data = new byte[protoLen];
- buf.get(data);
- proto = PacketHeaderProto.parseFrom(data);
- }
-
- public void readFields(DataInputStream in) throws IOException {
- this.packetLen = in.readInt();
- short protoLen = in.readShort();
- byte[] data = new byte[protoLen];
- in.readFully(data);
- proto = PacketHeaderProto.parseFrom(data);
- }
-
-
- /**
- * Write the header into the buffer.
- * This requires that PKT_HEADER_LEN bytes are available.
- */
- public void putInBuffer(final ByteBuffer buf) {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
- try {
- buf.putInt(packetLen);
- buf.putShort((short) proto.getSerializedSize());
- proto.writeTo(new ByteBufferOutputStream(buf));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public void write(DataOutputStream out) throws IOException {
- assert proto.getSerializedSize() == PROTO_SIZE
- : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
- out.writeInt(packetLen);
- out.writeShort(proto.getSerializedSize());
- proto.writeTo(out);
- }
-
- /**
- * Perform a sanity check on the packet, returning true if it is sane.
- * @param lastSeqNo the previous sequence number received - we expect the current
- * sequence number to be larger by 1.
- */
- public boolean sanityCheck(long lastSeqNo) {
- // We should only have a non-positive data length for the last packet
- if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
- // The last packet should not contain data
- if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
- // Seqnos should always increase by 1 with each packet received
- if (proto.getSeqno() != lastSeqNo + 1) return false;
- return true;
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof PacketHeader)) return false;
- PacketHeader other = (PacketHeader)o;
- return this.proto.equals(other.proto);
- }
-
- @Override
- public int hashCode() {
- return (int)proto.getSeqno();
- }
- }
-
- /**
- * The setting of replace-datanode-on-failure feature.
- */
- public enum ReplaceDatanodeOnFailure {
- /** The feature is disabled in the entire site. */
- DISABLE,
- /** Never add a new datanode. */
- NEVER,
- /**
- * DEFAULT policy:
- * Let r be the replication number.
- * Let n be the number of existing datanodes.
- * Add a new datanode only if r >= 3 and either
- * (1) floor(r/2) >= n; or
- * (2) r > n and the block is hflushed/appended.
- */
- DEFAULT,
- /** Always add a new datanode when an existing datanode is removed. */
- ALWAYS;
-
- /** Check if the feature is enabled. */
- public void checkEnabled() {
- if (this == DISABLE) {
- throw new UnsupportedOperationException(
- "This feature is disabled. Please refer to "
- + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
- + " configuration property.");
- }
- }
-
- /** Is the policy satisfied? */
- public boolean satisfy(
- final short replication, final DatanodeInfo[] existings,
- final boolean isAppend, final boolean isHflushed) {
- final int n = existings == null? 0: existings.length;
- if (n == 0 || n >= replication) {
- //don't need to add datanode for any policy.
- return false;
- } else if (this == DISABLE || this == NEVER) {
- return false;
- } else if (this == ALWAYS) {
- return true;
- } else {
- //DEFAULT
- if (replication < 3) {
- return false;
- } else {
- if (n <= (replication/2)) {
- return true;
- } else {
- return isAppend || isHflushed;
- }
- }
- }
- }
-
- /** Get the setting from configuration. */
- public static ReplaceDatanodeOnFailure get(final Configuration conf) {
- final boolean enabled = conf.getBoolean(
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
- if (!enabled) {
- return DISABLE;
- }
-
- final String policy = conf.get(
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
- for(int i = 1; i < values().length; i++) {
- final ReplaceDatanodeOnFailure rdof = values()[i];
- if (rdof.name().equalsIgnoreCase(policy)) {
- return rdof;
- }
- }
- throw new HadoopIllegalArgumentException("Illegal configuration value for "
- + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
- + ": " + policy);
- }
-
- /** Write the setting to configuration. */
- public void write(final Configuration conf) {
- conf.setBoolean(
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
- this != DISABLE);
- conf.set(
- DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
- name());
- }
- }
-}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
new file mode 100644
index 0000000..5f86e52
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Block Construction Stage */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockConstructionStage {
+ /** The enumerates are always listed as regular stage followed by the
+ * recovery stage.
+ * Changing this order will make getRecoveryStage not working.
+ */
+ // pipeline set up for block append
+ PIPELINE_SETUP_APPEND,
+ // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+ PIPELINE_SETUP_APPEND_RECOVERY,
+ // data streaming
+ DATA_STREAMING,
+ // pipeline setup for failed data streaming recovery
+ PIPELINE_SETUP_STREAMING_RECOVERY,
+ // close the block and pipeline
+ PIPELINE_CLOSE,
+ // Recover a failed PIPELINE_CLOSE
+ PIPELINE_CLOSE_RECOVERY,
+ // pipeline set up for block creation
+ PIPELINE_SETUP_CREATE,
+ // transfer RBW for adding datanodes
+ TRANSFER_RBW,
+ // transfer Finalized for adding datanodes
+ TRANSFER_FINALIZED;
+
+ final static private byte RECOVERY_BIT = (byte)1;
+
+ /**
+ * get the recovery stage of this stage
+ */
+ public BlockConstructionStage getRecoveryStage() {
+ if (this == PIPELINE_SETUP_CREATE) {
+ throw new IllegalArgumentException( "Unexpected blockStage " + this);
+ } else {
+ return values()[ordinal()|RECOVERY_BIT];
+ }
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
similarity index 85%
rename from src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
rename to src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
index 783f65c..b6d31cb 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -15,10 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.hdfs.protocol;
+package org.apache.hadoop.hdfs.protocol.datatransfer;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
@@ -30,9 +33,11 @@
* Static utilities for dealing with the protocol buffers used by the
* Data Transfer Protocol.
*/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
abstract class DataTransferProtoUtil {
- static DataTransferProtocol.BlockConstructionStage fromProto(
+ static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
stage.name());
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
new file mode 100644
index 0000000..3ea3016
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Transfer data to/from datanode using a streaming protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface DataTransferProtocol {
+ public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
+
+ /** Version for data transfers between clients and datanodes
+ * This should change when serialization of DatanodeInfo, not just
+ * when protocol changes. It is not very obvious.
+ */
+ /*
+ * Version 27:
+ * Move DataTransferProtocol and the inner classes to a package.
+ */
+ public static final int DATA_TRANSFER_VERSION = 27;
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
new file mode 100644
index 0000000..4b9b47f
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Operation */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum Op {
+ WRITE_BLOCK((byte)80),
+ READ_BLOCK((byte)81),
+ READ_METADATA((byte)82),
+ REPLACE_BLOCK((byte)83),
+ COPY_BLOCK((byte)84),
+ BLOCK_CHECKSUM((byte)85),
+ TRANSFER_BLOCK((byte)86);
+
+ /** The code for this operation. */
+ public final byte code;
+
+ private Op(byte code) {
+ this.code = code;
+ }
+
+ private static final int FIRST_CODE = values()[0].code;
+ /** Return the object represented by the code. */
+ private static Op valueOf(byte code) {
+ final int i = (code & 0xff) - FIRST_CODE;
+ return i < 0 || i >= values().length? null: values()[i];
+ }
+
+ /** Read from in */
+ public static Op read(DataInput in) throws IOException {
+ return valueOf(in.readByte());
+ }
+
+ /** Write to out */
+ public void write(DataOutput out) throws IOException {
+ out.write(code);
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
new file mode 100644
index 0000000..73e1b20
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+
+/**
+ * Header data for each packet that goes through the read/write pipelines.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PacketHeader {
+ /** Header size for a packet */
+ private static final int PROTO_SIZE =
+ PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(0)
+ .setSeqno(0)
+ .setLastPacketInBlock(false)
+ .setDataLen(0)
+ .build().getSerializedSize();
+ public static final int PKT_HEADER_LEN =
+ 6 + PROTO_SIZE;
+
+ private int packetLen;
+ private PacketHeaderProto proto;
+
+ public PacketHeader() {
+ }
+
+ public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+ boolean lastPacketInBlock, int dataLen) {
+ this.packetLen = packetLen;
+ proto = PacketHeaderProto.newBuilder()
+ .setOffsetInBlock(offsetInBlock)
+ .setSeqno(seqno)
+ .setLastPacketInBlock(lastPacketInBlock)
+ .setDataLen(dataLen)
+ .build();
+ }
+
+ public int getDataLen() {
+ return proto.getDataLen();
+ }
+
+ public boolean isLastPacketInBlock() {
+ return proto.getLastPacketInBlock();
+ }
+
+ public long getSeqno() {
+ return proto.getSeqno();
+ }
+
+ public long getOffsetInBlock() {
+ return proto.getOffsetInBlock();
+ }
+
+ public int getPacketLen() {
+ return packetLen;
+ }
+
+ @Override
+ public String toString() {
+ return "PacketHeader with packetLen=" + packetLen +
+ "Header data: " +
+ proto.toString();
+ }
+
+ public void readFields(ByteBuffer buf) throws IOException {
+ packetLen = buf.getInt();
+ short protoLen = buf.getShort();
+ byte[] data = new byte[protoLen];
+ buf.get(data);
+ proto = PacketHeaderProto.parseFrom(data);
+ }
+
+ public void readFields(DataInputStream in) throws IOException {
+ this.packetLen = in.readInt();
+ short protoLen = in.readShort();
+ byte[] data = new byte[protoLen];
+ in.readFully(data);
+ proto = PacketHeaderProto.parseFrom(data);
+ }
+
+
+ /**
+ * Write the header into the buffer.
+ * This requires that PKT_HEADER_LEN bytes are available.
+ */
+ public void putInBuffer(final ByteBuffer buf) {
+ assert proto.getSerializedSize() == PROTO_SIZE
+ : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ try {
+ buf.putInt(packetLen);
+ buf.putShort((short) proto.getSerializedSize());
+ proto.writeTo(new ByteBufferOutputStream(buf));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void write(DataOutputStream out) throws IOException {
+ assert proto.getSerializedSize() == PROTO_SIZE
+ : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+ out.writeInt(packetLen);
+ out.writeShort(proto.getSerializedSize());
+ proto.writeTo(out);
+ }
+
+ /**
+ * Perform a sanity check on the packet, returning true if it is sane.
+ * @param lastSeqNo the previous sequence number received - we expect the current
+ * sequence number to be larger by 1.
+ */
+ public boolean sanityCheck(long lastSeqNo) {
+ // We should only have a non-positive data length for the last packet
+ if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
+ // The last packet should not contain data
+ if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
+ // Seqnos should always increase by 1 with each packet received
+ if (proto.getSeqno() != lastSeqNo + 1) return false;
+ return true;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof PacketHeader)) return false;
+ PacketHeader other = (PacketHeader)o;
+ return this.proto.equals(other.proto);
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)proto.getSeqno();
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
new file mode 100644
index 0000000..6498a56
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+
+/** Pipeline Acknowledgment **/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PipelineAck {
+ PipelineAckProto proto;
+ public final static long UNKOWN_SEQNO = -2;
+
+ /** default constructor **/
+ public PipelineAck() {
+ }
+
+ /**
+ * Constructor
+ * @param seqno sequence number
+ * @param replies an array of replies
+ */
+ public PipelineAck(long seqno, Status[] replies) {
+ proto = PipelineAckProto.newBuilder()
+ .setSeqno(seqno)
+ .addAllStatus(Arrays.asList(replies))
+ .build();
+ }
+
+ /**
+ * Get the sequence number
+ * @return the sequence number
+ */
+ public long getSeqno() {
+ return proto.getSeqno();
+ }
+
+ /**
+ * Get the number of replies
+ * @return the number of replies
+ */
+ public short getNumOfReplies() {
+ return (short)proto.getStatusCount();
+ }
+
+ /**
+ * get the ith reply
+ * @return the the ith reply
+ */
+ public Status getReply(int i) {
+ return proto.getStatus(i);
+ }
+
+ /**
+ * Check if this ack contains error status
+ * @return true if all statuses are SUCCESS
+ */
+ public boolean isSuccess() {
+ for (DataTransferProtos.Status reply : proto.getStatusList()) {
+ if (reply != DataTransferProtos.Status.SUCCESS) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**** Writable interface ****/
+ public void readFields(InputStream in) throws IOException {
+ proto = PipelineAckProto.parseFrom(vintPrefixed(in));
+ }
+
+ public void write(OutputStream out) throws IOException {
+ proto.writeDelimitedTo(out);
+ }
+
+ @Override //Object
+ public String toString() {
+ return proto.toString();
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
new file mode 100644
index 0000000..c17b812
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/** Receiver */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class Receiver {
+ /** Read an Op. It also checks protocol version. */
+ protected final Op readOp(DataInputStream in) throws IOException {
+ final short version = in.readShort();
+ if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
+ throw new IOException( "Version Mismatch (Expected: " +
+ DataTransferProtocol.DATA_TRANSFER_VERSION +
+ ", Received: " + version + " )");
+ }
+ return Op.read(in);
+ }
+
+ /** Process op by the corresponding method. */
+ protected final void processOp(Op op, DataInputStream in
+ ) throws IOException {
+ switch(op) {
+ case READ_BLOCK:
+ opReadBlock(in);
+ break;
+ case WRITE_BLOCK:
+ opWriteBlock(in);
+ break;
+ case REPLACE_BLOCK:
+ opReplaceBlock(in);
+ break;
+ case COPY_BLOCK:
+ opCopyBlock(in);
+ break;
+ case BLOCK_CHECKSUM:
+ opBlockChecksum(in);
+ break;
+ case TRANSFER_BLOCK:
+ opTransferBlock(in);
+ break;
+ default:
+ throw new IOException("Unknown op " + op + " in data stream");
+ }
+ }
+
+ /** Receive OP_READ_BLOCK */
+ private void opReadBlock(DataInputStream in) throws IOException {
+ OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
+
+ ExtendedBlock b = fromProto(
+ proto.getHeader().getBaseHeader().getBlock());
+ Token<BlockTokenIdentifier> token = fromProto(
+ proto.getHeader().getBaseHeader().getToken());
+
+ opReadBlock(in, b, proto.getOffset(), proto.getLen(),
+ proto.getHeader().getClientName(), token);
+ }
+ /**
+ * Abstract OP_READ_BLOCK method. Read a block.
+ */
+ protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
+ long offset, long length, String client,
+ Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+ /** Receive OP_WRITE_BLOCK */
+ private void opWriteBlock(DataInputStream in) throws IOException {
+ final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+ opWriteBlock(in,
+ fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ proto.getPipelineSize(),
+ fromProto(proto.getStage()),
+ proto.getLatestGenerationStamp(),
+ proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+ proto.getHeader().getClientName(),
+ fromProto(proto.getSource()),
+ fromProtos(proto.getTargetsList()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()));
+ }
+
+ /**
+ * Abstract OP_WRITE_BLOCK method.
+ * Write a block.
+ */
+ protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
+ int pipelineSize, BlockConstructionStage stage, long newGs,
+ long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
+ DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
+ throws IOException;
+
+ /** Receive {@link Op#TRANSFER_BLOCK} */
+ private void opTransferBlock(DataInputStream in) throws IOException {
+ final OpTransferBlockProto proto =
+ OpTransferBlockProto.parseFrom(vintPrefixed(in));
+
+ opTransferBlock(in,
+ fromProto(proto.getHeader().getBaseHeader().getBlock()),
+ proto.getHeader().getClientName(),
+ fromProtos(proto.getTargetsList()),
+ fromProto(proto.getHeader().getBaseHeader().getToken()));
+ }
+
+ /**
+ * Abstract {@link Op#TRANSFER_BLOCK} method.
+ * For {@link BlockConstructionStage#TRANSFER_RBW}
+ * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+ */
+ protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
+ String client, DatanodeInfo[] targets,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException;
+
+ /** Receive OP_REPLACE_BLOCK */
+ private void opReplaceBlock(DataInputStream in) throws IOException {
+ OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
+
+ opReplaceBlock(in,
+ fromProto(proto.getHeader().getBlock()),
+ proto.getDelHint(),
+ fromProto(proto.getSource()),
+ fromProto(proto.getHeader().getToken()));
+ }
+
+ /**
+ * Abstract OP_REPLACE_BLOCK method.
+ * It is used for balancing purpose; send to a destination
+ */
+ protected abstract void opReplaceBlock(DataInputStream in,
+ ExtendedBlock blk, String delHint, DatanodeInfo src,
+ Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+ /** Receive OP_COPY_BLOCK */
+ private void opCopyBlock(DataInputStream in) throws IOException {
+ OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
+
+ opCopyBlock(in,
+ fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()));
+ }
+
+ /**
+ * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
+ * a proxy source.
+ */
+ protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException;
+
+ /** Receive OP_BLOCK_CHECKSUM */
+ private void opBlockChecksum(DataInputStream in) throws IOException {
+ OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
+
+ opBlockChecksum(in,
+ fromProto(proto.getHeader().getBlock()),
+ fromProto(proto.getHeader().getToken()));
+ }
+
+ /**
+ * Abstract OP_BLOCK_CHECKSUM method.
+ * Get the checksum of a block
+ */
+ protected abstract void opBlockChecksum(DataInputStream in,
+ ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
+ throws IOException;
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
new file mode 100644
index 0000000..3184554
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * The setting of replace-datanode-on-failure feature.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum ReplaceDatanodeOnFailure {
+ /** The feature is disabled in the entire site. */
+ DISABLE,
+ /** Never add a new datanode. */
+ NEVER,
+ /**
+ * DEFAULT policy:
+ * Let r be the replication number.
+ * Let n be the number of existing datanodes.
+ * Add a new datanode only if r >= 3 and either
+ * (1) floor(r/2) >= n; or
+ * (2) r > n and the block is hflushed/appended.
+ */
+ DEFAULT,
+ /** Always add a new datanode when an existing datanode is removed. */
+ ALWAYS;
+
+ /** Check if the feature is enabled. */
+ public void checkEnabled() {
+ if (this == DISABLE) {
+ throw new UnsupportedOperationException(
+ "This feature is disabled. Please refer to "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+ + " configuration property.");
+ }
+ }
+
+ /** Is the policy satisfied? */
+ public boolean satisfy(
+ final short replication, final DatanodeInfo[] existings,
+ final boolean isAppend, final boolean isHflushed) {
+ final int n = existings == null? 0: existings.length;
+ if (n == 0 || n >= replication) {
+ //don't need to add datanode for any policy.
+ return false;
+ } else if (this == DISABLE || this == NEVER) {
+ return false;
+ } else if (this == ALWAYS) {
+ return true;
+ } else {
+ //DEFAULT
+ if (replication < 3) {
+ return false;
+ } else {
+ if (n <= (replication/2)) {
+ return true;
+ } else {
+ return isAppend || isHflushed;
+ }
+ }
+ }
+ }
+
+ /** Get the setting from configuration. */
+ public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+ final boolean enabled = conf.getBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+ if (!enabled) {
+ return DISABLE;
+ }
+
+ final String policy = conf.get(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+ for(int i = 1; i < values().length; i++) {
+ final ReplaceDatanodeOnFailure rdof = values()[i];
+ if (rdof.name().equalsIgnoreCase(policy)) {
+ return rdof;
+ }
+ }
+ throw new HadoopIllegalArgumentException("Illegal configuration value for "
+ + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+ + ": " + policy);
+ }
+
+ /** Write the setting to configuration. */
+ public void write(final Configuration conf) {
+ conf.setBoolean(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+ this != DISABLE);
+ conf.set(
+ DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+ name());
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
new file mode 100644
index 0000000..9502ad4
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.protobuf.Message;
+
+/** Sender */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class Sender {
+ /** Initialize a operation. */
+ private static void op(final DataOutput out, final Op op
+ ) throws IOException {
+ out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ op.write(out);
+ }
+
+ private static void send(final DataOutputStream out, final Op opcode,
+ final Message proto) throws IOException {
+ op(out, opcode);
+ proto.writeDelimitedTo(out);
+ out.flush();
+ }
+
+ /** Send OP_READ_BLOCK */
+ public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
+ long blockOffset, long blockLen, String clientName,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+
+ OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+ .setOffset(blockOffset)
+ .setLen(blockLen)
+ .build();
+
+ send(out, Op.READ_BLOCK, proto);
+ }
+
+
+ /** Send OP_WRITE_BLOCK */
+ public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
+ int pipelineSize, BlockConstructionStage stage, long newGs,
+ long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
+ DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+ ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
+ blockToken);
+
+ OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+ .setHeader(header)
+ .addAllTargets(
+ toProtos(targets, 1))
+ .setStage(toProto(stage))
+ .setPipelineSize(pipelineSize)
+ .setMinBytesRcvd(minBytesRcvd)
+ .setMaxBytesRcvd(maxBytesRcvd)
+ .setLatestGenerationStamp(newGs);
+
+ if (src != null) {
+ proto.setSource(toProto(src));
+ }
+
+ send(out, Op.WRITE_BLOCK, proto.build());
+ }
+
+ /** Send {@link Op#TRANSFER_BLOCK} */
+ public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
+ String client, DatanodeInfo[] targets,
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
+
+ OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildClientHeader(
+ blk, client, blockToken))
+ .addAllTargets(toProtos(targets, 0))
+ .build();
+
+ send(out, Op.TRANSFER_BLOCK, proto);
+ }
+
+ /** Send OP_REPLACE_BLOCK */
+ public static void opReplaceBlock(DataOutputStream out,
+ ExtendedBlock blk, String delHint, DatanodeInfo src,
+ Token<BlockTokenIdentifier> blockToken) throws IOException {
+ OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .setDelHint(delHint)
+ .setSource(toProto(src))
+ .build();
+
+ send(out, Op.REPLACE_BLOCK, proto);
+ }
+
+ /** Send OP_COPY_BLOCK */
+ public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+ OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .build();
+
+ send(out, Op.COPY_BLOCK, proto);
+ }
+
+ /** Send OP_BLOCK_CHECKSUM */
+ public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
+ Token<BlockTokenIdentifier> blockToken)
+ throws IOException {
+ OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+ .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+ .build();
+
+ send(out, Op.BLOCK_CHECKSUM, proto);
+ }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index efaa80d..a5ecf85 100644
--- a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.balancer;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
@@ -51,12 +53,11 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -74,8 +75,6 @@
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster.
* The tool is deployed as an application program that can be run by the
@@ -349,7 +348,7 @@
private void sendRequest(DataOutputStream out) throws IOException {
final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
- DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(),
+ Sender.opReplaceBlock(out, eb, source.getStorageID(),
proxySource.getDatanode(), accessToken);
}
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 0956cc5..b382323 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -34,13 +34,13 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 2bc8086..55c08e1 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -33,7 +33,7 @@
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.SocketOutputStream;
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 09a87d4..58319ce 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -18,7 +18,32 @@
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedOutputStream;
@@ -66,16 +91,17 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
@@ -1947,7 +1973,7 @@
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
}
- DataTransferProtocol.Sender.opWriteBlock(out,
+ Sender.opWriteBlock(out,
b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
// send data & checksum
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 8b5fc08..098f151 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -17,8 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.*;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
@@ -38,16 +39,18 @@
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
-
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -55,8 +58,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
@@ -69,8 +70,7 @@
/**
* Thread for processing incoming/outgoing data stream.
*/
-class DataXceiver extends DataTransferProtocol.Receiver
- implements Runnable, FSConstants {
+class DataXceiver extends Receiver implements Runnable, FSConstants {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
@@ -139,7 +139,7 @@
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
- DataTransferProtocol.Op op;
+ Op op;
try {
if (opsProcessed != 0) {
assert socketKeepaliveTimeout > 0;
@@ -206,8 +206,7 @@
DataOutputStream out = new DataOutputStream(
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
checkAccess(out, true, block, blockToken,
- DataTransferProtocol.Op.READ_BLOCK,
- BlockTokenSecretManager.AccessMode.READ);
+ Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
// send the block
BlockSender blockSender = null;
@@ -329,8 +328,7 @@
NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
- DataTransferProtocol.Op.WRITE_BLOCK,
- BlockTokenSecretManager.AccessMode.WRITE);
+ Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
DataOutputStream mirrorOut = null; // stream to next target
DataInputStream mirrorIn = null; // reply from next target
@@ -375,7 +373,7 @@
SMALL_BUFFER_SIZE));
mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
- DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
+ Sender.opWriteBlock(mirrorOut, originalBlock,
pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
srcDataNode, targets, blockToken);
@@ -497,10 +495,9 @@
final DatanodeInfo[] targets,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
checkAccess(null, true, blk, blockToken,
- DataTransferProtocol.Op.TRANSFER_BLOCK,
- BlockTokenSecretManager.AccessMode.COPY);
+ Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
- updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+ updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
@@ -521,8 +518,7 @@
final DataOutputStream out = new DataOutputStream(
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
checkAccess(out, true, block, blockToken,
- DataTransferProtocol.Op.BLOCK_CHECKSUM,
- BlockTokenSecretManager.AccessMode.READ);
+ Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
updateCurrentThreadName("Reading metadata for block " + block);
final MetaDataInputStream metadataIn =
datanode.data.getMetaDataInputStream(block);
@@ -693,7 +689,7 @@
new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
/* send request to the proxy */
- DataTransferProtocol.Sender.opCopyBlock(proxyOut, block, blockToken);
+ Sender.opCopyBlock(proxyOut, block, blockToken);
// receive the response from the proxy
proxyReply = new DataInputStream(new BufferedInputStream(
@@ -760,11 +756,6 @@
return now() - opStartTime;
}
- private void updateCounter(MutableCounterLong localCounter,
- MutableCounterLong remoteCounter) {
- (isLocal? localCounter: remoteCounter).incr();
- }
-
/**
* Utility function for sending a response.
* @param s socket to write to
@@ -793,7 +784,7 @@
private void checkAccess(DataOutputStream out, final boolean reply,
final ExtendedBlock blk,
final Token<BlockTokenIdentifier> t,
- final DataTransferProtocol.Op op,
+ final Op op,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (datanode.isBlockTokenEnabled) {
try {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d0a2afe..4f1b645 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -17,45 +17,100 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import org.apache.commons.logging.*;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -65,51 +120,33 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.DataOutputStream;
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
/***************************************************
* FSNamesystem does the actual bookkeeping work for the
* DataNode.
@@ -270,8 +307,8 @@
private FsServerDefaults serverDefaults;
// allow appending to hdfs files
private boolean supportAppends = true;
- private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
- DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+ private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
+ ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information
private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -552,7 +589,7 @@
+ " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
+ " min(s)");
- this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+ this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
}
/**
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index c3bd393..1b4fea3 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -32,7 +32,7 @@
import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
index 21ff0ae..9d4f189 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
@@ -27,8 +27,8 @@
import org.apache.hadoop.fi.DataTransferTestUtil;
import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
/** Aspect for DataTransferProtocol */
public aspect DataTransferProtocolAspects {
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index ed9de6d..b7e6277 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.log4j.Level;
import org.junit.Assert;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
index 8a25b08..dcfdcf9 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -37,7 +37,7 @@
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 57fdc43..02bbf5f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs;
+import static org.junit.Assert.assertEquals;
+
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
@@ -52,11 +54,11 @@
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -72,8 +74,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
-import static org.junit.Assert.*;
-
/** Utilities for HDFS tests */
public class DFSTestUtil {
@@ -674,7 +674,7 @@
final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
// send the request
- DataTransferProtocol.Sender.opTransferBlock(out, b, dfsClient.clientName,
+ Sender.opTransferBlock(out, b, dfsClient.clientName,
new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
out.flush();
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 63d1974..9133002 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hdfs;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -42,22 +38,23 @@
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@@ -188,7 +185,7 @@
String description, Boolean eofExcepted) throws IOException {
sendBuf.reset();
recvBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
+ Sender.opWriteBlock(sendOut, block, 0,
stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
if (eofExcepted) {
@@ -370,12 +367,12 @@
// bad ops
sendBuf.reset();
sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
- sendOut.writeByte(WRITE_BLOCK.code - 1);
+ sendOut.writeByte(Op.WRITE_BLOCK.code - 1);
sendRecvData("Wrong Op Code", true);
/* Test OP_WRITE_BLOCK */
sendBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -389,7 +386,7 @@
sendBuf.reset();
recvBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -412,7 +409,7 @@
// test for writing a valid zero size block
sendBuf.reset();
recvBuf.reset();
- DataTransferProtocol.Sender.opWriteBlock(sendOut,
+ Sender.opWriteBlock(sendOut,
new ExtendedBlock(poolId, ++newBlockId), 0,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -442,21 +439,21 @@
sendBuf.reset();
recvBuf.reset();
blk.setBlockId(blkid-1);
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
+ Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
+ Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
+ Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@@ -465,7 +462,7 @@
recvBuf.reset();
sendResponse(Status.SUCCESS, null, recvOut);
sendBuf.reset();
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L,
+ Sender.opReadBlock(sendOut, blk, 0L,
-1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@@ -474,14 +471,14 @@
recvBuf.reset();
sendResponse(Status.ERROR, null, recvOut);
sendBuf.reset();
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L,
+ Sender.opReadBlock(sendOut, blk, 0L,
fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
- DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L,
+ Sender.opReadBlock(sendOut, blk, 0L,
fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
readFile(fileSys, file, fileLen);
} finally {
@@ -512,14 +509,6 @@
readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
assertEquals(hdr, readBack);
- // Test sanity check for good header
- PacketHeader goodHeader = new PacketHeader(
- 4, // size of packet
- 0, // OffsetInBlock
- 100, // sequencenumber
- true, // lastPacketInBlock
- 0); // chunk length
-
assertTrue(hdr.sanityCheck(99));
assertFalse(hdr.sanityCheck(100));
}
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
index a366bf7..2aa33da 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
@@ -26,9 +26,10 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
@@ -53,8 +54,7 @@
/** Test DEFAULT ReplaceDatanodeOnFailure policy. */
@Test
public void testDefaultPolicy() throws Exception {
- final DataTransferProtocol.ReplaceDatanodeOnFailure p
- = DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+ final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT;
final DatanodeInfo[] infos = new DatanodeInfo[5];
final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
@@ -113,7 +113,7 @@
final Configuration conf = new HdfsConfiguration();
//always replace a datanode
- DataTransferProtocol.ReplaceDatanodeOnFailure.ALWAYS.write(conf);
+ ReplaceDatanodeOnFailure.ALWAYS.write(conf);
final String[] racks = new String[REPLICATION];
Arrays.fill(racks, RACK0);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index 52ffa92..e6349ce 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -36,21 +36,21 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.net.NetUtils;
/**
* This class tests if block replacement request to data nodes work correctly.
@@ -258,7 +258,7 @@
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
- DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+ Sender.opReplaceBlock(out, block, source
.getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
out.flush();
// receiveResponse
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 809481b..ed862b8 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
@@ -26,22 +29,20 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.junit.Test;
-import org.junit.Before;
import org.junit.After;
-import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
/**
* Test that datanodes can correctly handle errors during block read/write.
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
index 6bc92a1..07c35e7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
@@ -29,10 +29,10 @@
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;