HDFS-2066. Create a package and individual class files for DataTransferProtocol.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1134869 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index aa605d5..2979bea 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -500,6 +500,9 @@
     HDFS-2003. Separate FSEditLog reading logic from edit log memory state
     building logic. (Ivan Kelly via todd)
 
+    HDFS-2066. Create a package and individual class files for
+    DataTransferProtocol.  (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/java/org/apache/hadoop/hdfs/BlockReader.java b/src/java/org/apache/hadoop/hdfs/BlockReader.java
index 9b98e1b..67bd6f8 100644
--- a/src/java/org/apache/hadoop/hdfs/BlockReader.java
+++ b/src/java/org/apache/hadoop/hdfs/BlockReader.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -30,10 +32,9 @@
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -45,8 +46,6 @@
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
 
 /** This is a wrapper around connection to datanode
  * and understands checksum, offset etc.
@@ -405,7 +404,7 @@
                                      String clientName)
                                      throws IOException {
     // in and out will be closed when sock is closed (by the caller)
-    DataTransferProtocol.Sender.opReadBlock(
+    Sender.opReadBlock(
         new DataOutputStream(new BufferedOutputStream(
             NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
         block, startOffset, len, clientName, blockToken);
diff --git a/src/java/org/apache/hadoop/hdfs/DFSClient.java b/src/java/org/apache/hadoop/hdfs/DFSClient.java
index 76de6c2..f881718 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
@@ -63,18 +61,20 @@
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
@@ -133,7 +133,7 @@
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
-  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+  final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
   final LeaseRenewer leaserenewer;
@@ -267,7 +267,7 @@
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
 
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
@@ -1114,11 +1114,10 @@
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("write to " + datanodes[j].getName() + ": "
-                + BLOCK_CHECKSUM + ", block=" + block);
+                + Op.BLOCK_CHECKSUM + ", block=" + block);
           }
           // get block MD5
-          DataTransferProtocol.Sender.opBlockChecksum(out, block,
-              lb.getBlockToken());
+          Sender.opBlockChecksum(out, block, lb.getBlockToken());
 
           final BlockOpResponseProto reply =
             BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(in));
diff --git a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 8843982..860cd9f 100644
--- a/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
-
 
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -48,17 +46,18 @@
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -68,7 +67,6 @@
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -849,7 +847,7 @@
             DataNode.SMALL_BUFFER_SIZE));
 
         //send the TRANSFER_BLOCK request
-        DataTransferProtocol.Sender.opTransferBlock(out, block,
+        Sender.opTransferBlock(out, block,
             dfsClient.clientName, targets, blockToken);
 
         //ack
@@ -1023,7 +1021,7 @@
         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
         // send the request
-        DataTransferProtocol.Sender.opWriteBlock(out, block,
+        Sender.opWriteBlock(out, block,
             nodes.length, recoveryFlag ? stage.getRecoveryStage() : stage, newGS, 
             block.getNumBytes(), bytesSent, dfsClient.clientName, null, nodes,
             accessToken);
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
deleted file mode 100644
index e756f95..0000000
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ /dev/null
@@ -1,716 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.protocol;
-
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.HadoopIllegalArgumentException;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
-import org.apache.hadoop.security.token.Token;
-
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.fromProto;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtoUtil.toProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
-import com.google.protobuf.Message;
-
-/**
- * Transfer data to/from datanode using a streaming protocol.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Evolving
-public interface DataTransferProtocol {
-  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
-  
-  /** Version for data transfers between clients and datanodes
-   * This should change when serialization of DatanodeInfo, not just
-   * when protocol changes. It is not very obvious. 
-   */
-  /*
-   * Version 26:
-   *    Use protobuf.
-   */
-  public static final int DATA_TRANSFER_VERSION = 26;
-
-  /** Operation */
-  public enum Op {
-    WRITE_BLOCK((byte)80),
-    READ_BLOCK((byte)81),
-    READ_METADATA((byte)82),
-    REPLACE_BLOCK((byte)83),
-    COPY_BLOCK((byte)84),
-    BLOCK_CHECKSUM((byte)85),
-    TRANSFER_BLOCK((byte)86);
-
-    /** The code for this operation. */
-    public final byte code;
-    
-    private Op(byte code) {
-      this.code = code;
-    }
-    
-    private static final int FIRST_CODE = values()[0].code;
-    /** Return the object represented by the code. */
-    private static Op valueOf(byte code) {
-      final int i = (code & 0xff) - FIRST_CODE;
-      return i < 0 || i >= values().length? null: values()[i];
-    }
-
-    /** Read from in */
-    public static Op read(DataInput in) throws IOException {
-      return valueOf(in.readByte());
-    }
-
-    /** Write to out */
-    public void write(DataOutput out) throws IOException {
-      out.write(code);
-    }
-  }
-    
-  public enum BlockConstructionStage {
-    /** The enumerates are always listed as regular stage followed by the
-     * recovery stage. 
-     * Changing this order will make getRecoveryStage not working.
-     */
-    // pipeline set up for block append
-    PIPELINE_SETUP_APPEND,
-    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
-    PIPELINE_SETUP_APPEND_RECOVERY,
-    // data streaming
-    DATA_STREAMING,
-    // pipeline setup for failed data streaming recovery
-    PIPELINE_SETUP_STREAMING_RECOVERY,
-    // close the block and pipeline
-    PIPELINE_CLOSE,
-    // Recover a failed PIPELINE_CLOSE
-    PIPELINE_CLOSE_RECOVERY,
-    // pipeline set up for block creation
-    PIPELINE_SETUP_CREATE,
-    // transfer RBW for adding datanodes
-    TRANSFER_RBW,
-    // transfer Finalized for adding datanodes
-    TRANSFER_FINALIZED;
-    
-    final static private byte RECOVERY_BIT = (byte)1;
-    
-    /**
-     * get the recovery stage of this stage
-     */
-    public BlockConstructionStage getRecoveryStage() {
-      if (this == PIPELINE_SETUP_CREATE) {
-        throw new IllegalArgumentException( "Unexpected blockStage " + this);
-      } else {
-        return values()[ordinal()|RECOVERY_BIT];
-      }
-    }
-  }    
-
-  
-  /** Sender */
-  @InterfaceAudience.Private
-  @InterfaceStability.Evolving
-  public static class Sender {
-    /** Initialize a operation. */
-    private static void op(final DataOutput out, final Op op
-        ) throws IOException {
-      out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
-      op.write(out);
-    }
-
-    private static void send(final DataOutputStream out, final Op opcode,
-        final Message proto) throws IOException {
-      op(out, opcode);
-      proto.writeDelimitedTo(out);
-      out.flush();
-    }
-
-    /** Send OP_READ_BLOCK */
-    public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
-        long blockOffset, long blockLen, String clientName,
-        Token<BlockTokenIdentifier> blockToken)
-        throws IOException {
-
-      OpReadBlockProto proto = OpReadBlockProto.newBuilder()
-        .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
-        .setOffset(blockOffset)
-        .setLen(blockLen)
-        .build();
-
-      send(out, Op.READ_BLOCK, proto);
-    }
-    
-
-    /** Send OP_WRITE_BLOCK */
-    public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
-        int pipelineSize, BlockConstructionStage stage, long newGs,
-        long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
-        DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
-        throws IOException {
-      ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
-          blockToken);
-      
-      OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
-        .setHeader(header)
-        .addAllTargets(
-            toProtos(targets, 1))
-        .setStage(toProto(stage))
-        .setPipelineSize(pipelineSize)
-        .setMinBytesRcvd(minBytesRcvd)
-        .setMaxBytesRcvd(maxBytesRcvd)
-        .setLatestGenerationStamp(newGs);
-      
-      if (src != null) {
-        proto.setSource(toProto(src));
-      }
-
-      send(out, Op.WRITE_BLOCK, proto.build());
-    }
-
-    /** Send {@link Op#TRANSFER_BLOCK} */
-    public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
-        String client, DatanodeInfo[] targets,
-        Token<BlockTokenIdentifier> blockToken) throws IOException {
-      
-      OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
-        .setHeader(DataTransferProtoUtil.buildClientHeader(
-            blk, client, blockToken))
-        .addAllTargets(toProtos(targets, 0))
-        .build();
-
-      send(out, Op.TRANSFER_BLOCK, proto);
-    }
-
-    /** Send OP_REPLACE_BLOCK */
-    public static void opReplaceBlock(DataOutputStream out,
-        ExtendedBlock blk, String delHint, DatanodeInfo src,
-        Token<BlockTokenIdentifier> blockToken) throws IOException {
-      OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
-        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-        .setDelHint(delHint)
-        .setSource(toProto(src))
-        .build();
-      
-      send(out, Op.REPLACE_BLOCK, proto);
-    }
-
-    /** Send OP_COPY_BLOCK */
-    public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
-        Token<BlockTokenIdentifier> blockToken)
-        throws IOException {
-      OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
-        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-        .build();
-      
-      send(out, Op.COPY_BLOCK, proto);
-    }
-
-    /** Send OP_BLOCK_CHECKSUM */
-    public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
-        Token<BlockTokenIdentifier> blockToken)
-        throws IOException {
-      OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
-        .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
-        .build();
-      
-      send(out, Op.BLOCK_CHECKSUM, proto);
-    }
-  }
-
-  /** Receiver */
-  public static abstract class Receiver {
-    /** Read an Op.  It also checks protocol version. */
-    protected final Op readOp(DataInputStream in) throws IOException {
-      final short version = in.readShort();
-      if (version != DATA_TRANSFER_VERSION) {
-        throw new IOException( "Version Mismatch (Expected: " +
-            DataTransferProtocol.DATA_TRANSFER_VERSION  +
-            ", Received: " +  version + " )");
-      }
-      return Op.read(in);
-    }
-
-    /** Process op by the corresponding method. */
-    protected final void processOp(Op op, DataInputStream in
-        ) throws IOException {
-      switch(op) {
-      case READ_BLOCK:
-        opReadBlock(in);
-        break;
-      case WRITE_BLOCK:
-        opWriteBlock(in);
-        break;
-      case REPLACE_BLOCK:
-        opReplaceBlock(in);
-        break;
-      case COPY_BLOCK:
-        opCopyBlock(in);
-        break;
-      case BLOCK_CHECKSUM:
-        opBlockChecksum(in);
-        break;
-      case TRANSFER_BLOCK:
-        opTransferBlock(in);
-        break;
-      default:
-        throw new IOException("Unknown op " + op + " in data stream");
-      }
-    }
-
-    /** Receive OP_READ_BLOCK */
-    private void opReadBlock(DataInputStream in) throws IOException {
-      OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
-      
-      ExtendedBlock b = fromProto(
-          proto.getHeader().getBaseHeader().getBlock());
-      Token<BlockTokenIdentifier> token = fromProto(
-          proto.getHeader().getBaseHeader().getToken());
-
-      opReadBlock(in, b, proto.getOffset(), proto.getLen(),
-          proto.getHeader().getClientName(), token);
-    }
-    /**
-     * Abstract OP_READ_BLOCK method. Read a block.
-     */
-    protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
-        long offset, long length, String client,
-        Token<BlockTokenIdentifier> blockToken) throws IOException;
-    
-    /** Receive OP_WRITE_BLOCK */
-    private void opWriteBlock(DataInputStream in) throws IOException {
-      final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
-      opWriteBlock(in,
-          fromProto(proto.getHeader().getBaseHeader().getBlock()),
-          proto.getPipelineSize(),
-          fromProto(proto.getStage()),
-          proto.getLatestGenerationStamp(),
-          proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
-          proto.getHeader().getClientName(),
-          fromProto(proto.getSource()),
-          fromProtos(proto.getTargetsList()),
-          fromProto(proto.getHeader().getBaseHeader().getToken()));
-    }
-
-    /**
-     * Abstract OP_WRITE_BLOCK method. 
-     * Write a block.
-     */
-    protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
-        int pipelineSize, BlockConstructionStage stage, long newGs,
-        long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
-        DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
-        throws IOException;
-
-    /** Receive {@link Op#TRANSFER_BLOCK} */
-    private void opTransferBlock(DataInputStream in) throws IOException {
-      final OpTransferBlockProto proto =
-        OpTransferBlockProto.parseFrom(vintPrefixed(in));
-
-      opTransferBlock(in,
-          fromProto(proto.getHeader().getBaseHeader().getBlock()),
-          proto.getHeader().getClientName(),
-          fromProtos(proto.getTargetsList()),
-          fromProto(proto.getHeader().getBaseHeader().getToken()));
-    }
-
-    /**
-     * Abstract {@link Op#TRANSFER_BLOCK} method.
-     * For {@link BlockConstructionStage#TRANSFER_RBW}
-     * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
-     */
-    protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
-        String client, DatanodeInfo[] targets,
-        Token<BlockTokenIdentifier> blockToken)
-        throws IOException;
-
-    /** Receive OP_REPLACE_BLOCK */
-    private void opReplaceBlock(DataInputStream in) throws IOException {
-      OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
-
-      opReplaceBlock(in,
-          fromProto(proto.getHeader().getBlock()),
-          proto.getDelHint(),
-          fromProto(proto.getSource()),
-          fromProto(proto.getHeader().getToken()));
-    }
-
-    /**
-     * Abstract OP_REPLACE_BLOCK method.
-     * It is used for balancing purpose; send to a destination
-     */
-    protected abstract void opReplaceBlock(DataInputStream in,
-        ExtendedBlock blk, String delHint, DatanodeInfo src,
-        Token<BlockTokenIdentifier> blockToken) throws IOException;
-
-    /** Receive OP_COPY_BLOCK */
-    private void opCopyBlock(DataInputStream in) throws IOException {
-      OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
-      
-      opCopyBlock(in,
-          fromProto(proto.getHeader().getBlock()),
-          fromProto(proto.getHeader().getToken()));
-    }
-
-    /**
-     * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
-     * a proxy source.
-     */
-    protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
-        Token<BlockTokenIdentifier> blockToken)
-        throws IOException;
-
-    /** Receive OP_BLOCK_CHECKSUM */
-    private void opBlockChecksum(DataInputStream in) throws IOException {
-      OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
-      
-      opBlockChecksum(in,
-          fromProto(proto.getHeader().getBlock()),
-          fromProto(proto.getHeader().getToken()));
-    }
-
-    /**
-     * Abstract OP_BLOCK_CHECKSUM method.
-     * Get the checksum of a block 
-     */
-    protected abstract void opBlockChecksum(DataInputStream in,
-        ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
-        throws IOException;
-  }
-  
-  /** reply **/
-  @InterfaceAudience.Private
-  @InterfaceStability.Evolving
-  public static class PipelineAck {
-    PipelineAckProto proto;
-    public final static long UNKOWN_SEQNO = -2;
-
-    /** default constructor **/
-    public PipelineAck() {
-    }
-    
-    /**
-     * Constructor
-     * @param seqno sequence number
-     * @param replies an array of replies
-     */
-    public PipelineAck(long seqno, Status[] replies) {
-      proto = PipelineAckProto.newBuilder()
-        .setSeqno(seqno)
-        .addAllStatus(Arrays.asList(replies))
-        .build();
-    }
-    
-    /**
-     * Get the sequence number
-     * @return the sequence number
-     */
-    public long getSeqno() {
-      return proto.getSeqno();
-    }
-    
-    /**
-     * Get the number of replies
-     * @return the number of replies
-     */
-    public short getNumOfReplies() {
-      return (short)proto.getStatusCount();
-    }
-    
-    /**
-     * get the ith reply
-     * @return the the ith reply
-     */
-    public Status getReply(int i) {
-      return proto.getStatus(i);
-    }
-    
-    /**
-     * Check if this ack contains error status
-     * @return true if all statuses are SUCCESS
-     */
-    public boolean isSuccess() {
-      for (DataTransferProtos.Status reply : proto.getStatusList()) {
-        if (reply != DataTransferProtos.Status.SUCCESS) {
-          return false;
-        }
-      }
-      return true;
-    }
-    
-    /**** Writable interface ****/
-    public void readFields(InputStream in) throws IOException {
-      proto = PipelineAckProto.parseFrom(vintPrefixed(in));
-    }
-
-    public void write(OutputStream out) throws IOException {
-      proto.writeDelimitedTo(out);
-    }
-    
-    @Override //Object
-    public String toString() {
-      return proto.toString();
-    }
-  }
-
-  /**
-   * Header data for each packet that goes through the read/write pipelines.
-   */
-  public static class PacketHeader {
-    /** Header size for a packet */
-    private static final int PROTO_SIZE = 
-      PacketHeaderProto.newBuilder()
-        .setOffsetInBlock(0)
-        .setSeqno(0)
-        .setLastPacketInBlock(false)
-        .setDataLen(0)
-        .build().getSerializedSize();
-    public static final int PKT_HEADER_LEN =
-      6 + PROTO_SIZE;
-
-    private int packetLen;
-    private PacketHeaderProto proto;
-
-    public PacketHeader() {
-    }
-
-    public PacketHeader(int packetLen, long offsetInBlock, long seqno,
-                        boolean lastPacketInBlock, int dataLen) {
-      this.packetLen = packetLen;
-      proto = PacketHeaderProto.newBuilder()
-        .setOffsetInBlock(offsetInBlock)
-        .setSeqno(seqno)
-        .setLastPacketInBlock(lastPacketInBlock)
-        .setDataLen(dataLen)
-        .build();
-    }
-
-    public int getDataLen() {
-      return proto.getDataLen();
-    }
-
-    public boolean isLastPacketInBlock() {
-      return proto.getLastPacketInBlock();
-    }
-
-    public long getSeqno() {
-      return proto.getSeqno();
-    }
-
-    public long getOffsetInBlock() {
-      return proto.getOffsetInBlock();
-    }
-
-    public int getPacketLen() {
-      return packetLen;
-    }
-
-    @Override
-    public String toString() {
-      return "PacketHeader with packetLen=" + packetLen +
-        "Header data: " + 
-        proto.toString();
-    }
-    
-    public void readFields(ByteBuffer buf) throws IOException {
-      packetLen = buf.getInt();
-      short protoLen = buf.getShort();
-      byte[] data = new byte[protoLen];
-      buf.get(data);
-      proto = PacketHeaderProto.parseFrom(data);
-    }
-    
-    public void readFields(DataInputStream in) throws IOException {
-      this.packetLen = in.readInt();
-      short protoLen = in.readShort();
-      byte[] data = new byte[protoLen];
-      in.readFully(data);
-      proto = PacketHeaderProto.parseFrom(data);
-    }
-
-
-    /**
-     * Write the header into the buffer.
-     * This requires that PKT_HEADER_LEN bytes are available.
-     */
-    public void putInBuffer(final ByteBuffer buf) {
-      assert proto.getSerializedSize() == PROTO_SIZE
-        : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
-      try {
-        buf.putInt(packetLen);
-        buf.putShort((short) proto.getSerializedSize());
-        proto.writeTo(new ByteBufferOutputStream(buf));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    
-    public void write(DataOutputStream out) throws IOException {
-      assert proto.getSerializedSize() == PROTO_SIZE
-      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
-      out.writeInt(packetLen);
-      out.writeShort(proto.getSerializedSize());
-      proto.writeTo(out);
-    }
-
-    /**
-     * Perform a sanity check on the packet, returning true if it is sane.
-     * @param lastSeqNo the previous sequence number received - we expect the current
-     * sequence number to be larger by 1.
-     */
-    public boolean sanityCheck(long lastSeqNo) {
-      // We should only have a non-positive data length for the last packet
-      if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
-      // The last packet should not contain data
-      if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
-      // Seqnos should always increase by 1 with each packet received
-      if (proto.getSeqno() != lastSeqNo + 1) return false;
-      return true;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (!(o instanceof PacketHeader)) return false;
-      PacketHeader other = (PacketHeader)o;
-      return this.proto.equals(other.proto);
-    }
-
-    @Override
-    public int hashCode() {
-      return (int)proto.getSeqno();
-    }
-  }
-
-  /**
-   * The setting of replace-datanode-on-failure feature.
-   */
-  public enum ReplaceDatanodeOnFailure {
-    /** The feature is disabled in the entire site. */
-    DISABLE,
-    /** Never add a new datanode. */
-    NEVER,
-    /**
-     * DEFAULT policy:
-     *   Let r be the replication number.
-     *   Let n be the number of existing datanodes.
-     *   Add a new datanode only if r >= 3 and either
-     *   (1) floor(r/2) >= n; or
-     *   (2) r > n and the block is hflushed/appended.
-     */
-    DEFAULT,
-    /** Always add a new datanode when an existing datanode is removed. */
-    ALWAYS;
-
-    /** Check if the feature is enabled. */
-    public void checkEnabled() {
-      if (this == DISABLE) {
-        throw new UnsupportedOperationException(
-            "This feature is disabled.  Please refer to "
-            + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
-            + " configuration property.");
-      }
-    }
-
-    /** Is the policy satisfied? */
-    public boolean satisfy(
-        final short replication, final DatanodeInfo[] existings,
-        final boolean isAppend, final boolean isHflushed) {
-      final int n = existings == null? 0: existings.length;
-      if (n == 0 || n >= replication) {
-        //don't need to add datanode for any policy.
-        return false;
-      } else if (this == DISABLE || this == NEVER) {
-        return false;
-      } else if (this == ALWAYS) {
-        return true;
-      } else {
-        //DEFAULT
-        if (replication < 3) {
-          return false;
-        } else {
-          if (n <= (replication/2)) {
-            return true;
-          } else {
-            return isAppend || isHflushed;
-          }
-        }
-      }
-    }
-
-    /** Get the setting from configuration. */
-    public static ReplaceDatanodeOnFailure get(final Configuration conf) {
-      final boolean enabled = conf.getBoolean(
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
-      if (!enabled) {
-        return DISABLE;
-      }
-
-      final String policy = conf.get(
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
-      for(int i = 1; i < values().length; i++) {
-        final ReplaceDatanodeOnFailure rdof = values()[i];
-        if (rdof.name().equalsIgnoreCase(policy)) {
-          return rdof;
-        }
-      }
-      throw new HadoopIllegalArgumentException("Illegal configuration value for "
-          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
-          + ": " + policy);
-    }
-
-    /** Write the setting to configuration. */
-    public void write(final Configuration conf) {
-      conf.setBoolean(
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
-          this != DISABLE);
-      conf.set(
-          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
-          name());
-    }
-  }
-}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
new file mode 100644
index 0000000..5f86e52
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Block Construction Stage */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum BlockConstructionStage {
+  /** The enumerates are always listed as regular stage followed by the
+   * recovery stage. 
+   * Changing this order will make getRecoveryStage not working.
+   */
+  // pipeline set up for block append
+  PIPELINE_SETUP_APPEND,
+  // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+  PIPELINE_SETUP_APPEND_RECOVERY,
+  // data streaming
+  DATA_STREAMING,
+  // pipeline setup for failed data streaming recovery
+  PIPELINE_SETUP_STREAMING_RECOVERY,
+  // close the block and pipeline
+  PIPELINE_CLOSE,
+  // Recover a failed PIPELINE_CLOSE
+  PIPELINE_CLOSE_RECOVERY,
+  // pipeline set up for block creation
+  PIPELINE_SETUP_CREATE,
+  // transfer RBW for adding datanodes
+  TRANSFER_RBW,
+  // transfer Finalized for adding datanodes
+  TRANSFER_FINALIZED;
+  
+  final static private byte RECOVERY_BIT = (byte)1;
+  
+  /**
+   * get the recovery stage of this stage
+   */
+  public BlockConstructionStage getRecoveryStage() {
+    if (this == PIPELINE_SETUP_CREATE) {
+      throw new IllegalArgumentException( "Unexpected blockStage " + this);
+    } else {
+      return values()[ordinal()|RECOVERY_BIT];
+    }
+  }
+}    
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
similarity index 85%
rename from src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
rename to src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
index 783f65c..b6d31cb 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtoUtil.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
@@ -15,10 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.hdfs.protocol;
+package org.apache.hadoop.hdfs.protocol.datatransfer;
 
 
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
@@ -30,9 +33,11 @@
  * Static utilities for dealing with the protocol buffers used by the
  * Data Transfer Protocol.
  */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 abstract class DataTransferProtoUtil {
 
-  static DataTransferProtocol.BlockConstructionStage fromProto(
+  static BlockConstructionStage fromProto(
       OpWriteBlockProto.BlockConstructionStage stage) {
     return BlockConstructionStage.valueOf(BlockConstructionStage.class,
         stage.name());
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
new file mode 100644
index 0000000..3ea3016
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Transfer data to/from datanode using a streaming protocol.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface DataTransferProtocol {
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
+  
+  /** Version for data transfers between clients and datanodes
+   * This should change when serialization of DatanodeInfo, not just
+   * when protocol changes. It is not very obvious. 
+   */
+  /*
+   * Version 27:
+   *    Move DataTransferProtocol and the inner classes to a package.
+   */
+  public static final int DATA_TRANSFER_VERSION = 27;
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
new file mode 100644
index 0000000..4b9b47f
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/** Operation */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum Op {
+  WRITE_BLOCK((byte)80),
+  READ_BLOCK((byte)81),
+  READ_METADATA((byte)82),
+  REPLACE_BLOCK((byte)83),
+  COPY_BLOCK((byte)84),
+  BLOCK_CHECKSUM((byte)85),
+  TRANSFER_BLOCK((byte)86);
+
+  /** The code for this operation. */
+  public final byte code;
+  
+  private Op(byte code) {
+    this.code = code;
+  }
+  
+  private static final int FIRST_CODE = values()[0].code;
+  /** Return the object represented by the code. */
+  private static Op valueOf(byte code) {
+    final int i = (code & 0xff) - FIRST_CODE;
+    return i < 0 || i >= values().length? null: values()[i];
+  }
+
+  /** Read from in */
+  public static Op read(DataInput in) throws IOException {
+    return valueOf(in.readByte());
+  }
+
+  /** Write to out */
+  public void write(DataOutput out) throws IOException {
+    out.write(code);
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
new file mode 100644
index 0000000..73e1b20
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PacketHeaderProto;
+import org.apache.hadoop.hdfs.util.ByteBufferOutputStream;
+
+/**
+ * Header data for each packet that goes through the read/write pipelines.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PacketHeader {
+  /** Header size for a packet */
+  private static final int PROTO_SIZE = 
+    PacketHeaderProto.newBuilder()
+      .setOffsetInBlock(0)
+      .setSeqno(0)
+      .setLastPacketInBlock(false)
+      .setDataLen(0)
+      .build().getSerializedSize();
+  public static final int PKT_HEADER_LEN =
+    6 + PROTO_SIZE;
+
+  private int packetLen;
+  private PacketHeaderProto proto;
+
+  public PacketHeader() {
+  }
+
+  public PacketHeader(int packetLen, long offsetInBlock, long seqno,
+                      boolean lastPacketInBlock, int dataLen) {
+    this.packetLen = packetLen;
+    proto = PacketHeaderProto.newBuilder()
+      .setOffsetInBlock(offsetInBlock)
+      .setSeqno(seqno)
+      .setLastPacketInBlock(lastPacketInBlock)
+      .setDataLen(dataLen)
+      .build();
+  }
+
+  public int getDataLen() {
+    return proto.getDataLen();
+  }
+
+  public boolean isLastPacketInBlock() {
+    return proto.getLastPacketInBlock();
+  }
+
+  public long getSeqno() {
+    return proto.getSeqno();
+  }
+
+  public long getOffsetInBlock() {
+    return proto.getOffsetInBlock();
+  }
+
+  public int getPacketLen() {
+    return packetLen;
+  }
+
+  @Override
+  public String toString() {
+    return "PacketHeader with packetLen=" + packetLen +
+      "Header data: " + 
+      proto.toString();
+  }
+  
+  public void readFields(ByteBuffer buf) throws IOException {
+    packetLen = buf.getInt();
+    short protoLen = buf.getShort();
+    byte[] data = new byte[protoLen];
+    buf.get(data);
+    proto = PacketHeaderProto.parseFrom(data);
+  }
+  
+  public void readFields(DataInputStream in) throws IOException {
+    this.packetLen = in.readInt();
+    short protoLen = in.readShort();
+    byte[] data = new byte[protoLen];
+    in.readFully(data);
+    proto = PacketHeaderProto.parseFrom(data);
+  }
+
+
+  /**
+   * Write the header into the buffer.
+   * This requires that PKT_HEADER_LEN bytes are available.
+   */
+  public void putInBuffer(final ByteBuffer buf) {
+    assert proto.getSerializedSize() == PROTO_SIZE
+      : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    try {
+      buf.putInt(packetLen);
+      buf.putShort((short) proto.getSerializedSize());
+      proto.writeTo(new ByteBufferOutputStream(buf));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+  
+  public void write(DataOutputStream out) throws IOException {
+    assert proto.getSerializedSize() == PROTO_SIZE
+    : "Expected " + (PROTO_SIZE) + " got: " + proto.getSerializedSize();
+    out.writeInt(packetLen);
+    out.writeShort(proto.getSerializedSize());
+    proto.writeTo(out);
+  }
+
+  /**
+   * Perform a sanity check on the packet, returning true if it is sane.
+   * @param lastSeqNo the previous sequence number received - we expect the current
+   * sequence number to be larger by 1.
+   */
+  public boolean sanityCheck(long lastSeqNo) {
+    // We should only have a non-positive data length for the last packet
+    if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
+    // The last packet should not contain data
+    if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
+    // Seqnos should always increase by 1 with each packet received
+    if (proto.getSeqno() != lastSeqNo + 1) return false;
+    return true;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!(o instanceof PacketHeader)) return false;
+    PacketHeader other = (PacketHeader)o;
+    return this.proto.equals(other.proto);
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)proto.getSeqno();
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
new file mode 100644
index 0000000..6498a56
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+
+/** Pipeline Acknowledgment **/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class PipelineAck {
+  PipelineAckProto proto;
+  public final static long UNKOWN_SEQNO = -2;
+
+  /** default constructor **/
+  public PipelineAck() {
+  }
+  
+  /**
+   * Constructor
+   * @param seqno sequence number
+   * @param replies an array of replies
+   */
+  public PipelineAck(long seqno, Status[] replies) {
+    proto = PipelineAckProto.newBuilder()
+      .setSeqno(seqno)
+      .addAllStatus(Arrays.asList(replies))
+      .build();
+  }
+  
+  /**
+   * Get the sequence number
+   * @return the sequence number
+   */
+  public long getSeqno() {
+    return proto.getSeqno();
+  }
+  
+  /**
+   * Get the number of replies
+   * @return the number of replies
+   */
+  public short getNumOfReplies() {
+    return (short)proto.getStatusCount();
+  }
+  
+  /**
+   * get the ith reply
+   * @return the the ith reply
+   */
+  public Status getReply(int i) {
+    return proto.getStatus(i);
+  }
+  
+  /**
+   * Check if this ack contains error status
+   * @return true if all statuses are SUCCESS
+   */
+  public boolean isSuccess() {
+    for (DataTransferProtos.Status reply : proto.getStatusList()) {
+      if (reply != DataTransferProtos.Status.SUCCESS) {
+        return false;
+      }
+    }
+    return true;
+  }
+  
+  /**** Writable interface ****/
+  public void readFields(InputStream in) throws IOException {
+    proto = PipelineAckProto.parseFrom(vintPrefixed(in));
+  }
+
+  public void write(OutputStream out) throws IOException {
+    proto.writeDelimitedTo(out);
+  }
+  
+  @Override //Object
+  public String toString() {
+    return proto.toString();
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
new file mode 100644
index 0000000..c17b812
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.fromProtos;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+/** Receiver */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class Receiver {
+  /** Read an Op.  It also checks protocol version. */
+  protected final Op readOp(DataInputStream in) throws IOException {
+    final short version = in.readShort();
+    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
+      throw new IOException( "Version Mismatch (Expected: " +
+          DataTransferProtocol.DATA_TRANSFER_VERSION  +
+          ", Received: " +  version + " )");
+    }
+    return Op.read(in);
+  }
+
+  /** Process op by the corresponding method. */
+  protected final void processOp(Op op, DataInputStream in
+      ) throws IOException {
+    switch(op) {
+    case READ_BLOCK:
+      opReadBlock(in);
+      break;
+    case WRITE_BLOCK:
+      opWriteBlock(in);
+      break;
+    case REPLACE_BLOCK:
+      opReplaceBlock(in);
+      break;
+    case COPY_BLOCK:
+      opCopyBlock(in);
+      break;
+    case BLOCK_CHECKSUM:
+      opBlockChecksum(in);
+      break;
+    case TRANSFER_BLOCK:
+      opTransferBlock(in);
+      break;
+    default:
+      throw new IOException("Unknown op " + op + " in data stream");
+    }
+  }
+
+  /** Receive OP_READ_BLOCK */
+  private void opReadBlock(DataInputStream in) throws IOException {
+    OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
+    
+    ExtendedBlock b = fromProto(
+        proto.getHeader().getBaseHeader().getBlock());
+    Token<BlockTokenIdentifier> token = fromProto(
+        proto.getHeader().getBaseHeader().getToken());
+
+    opReadBlock(in, b, proto.getOffset(), proto.getLen(),
+        proto.getHeader().getClientName(), token);
+  }
+  /**
+   * Abstract OP_READ_BLOCK method. Read a block.
+   */
+  protected abstract void opReadBlock(DataInputStream in, ExtendedBlock blk,
+      long offset, long length, String client,
+      Token<BlockTokenIdentifier> blockToken) throws IOException;
+  
+  /** Receive OP_WRITE_BLOCK */
+  private void opWriteBlock(DataInputStream in) throws IOException {
+    final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
+    opWriteBlock(in,
+        fromProto(proto.getHeader().getBaseHeader().getBlock()),
+        proto.getPipelineSize(),
+        fromProto(proto.getStage()),
+        proto.getLatestGenerationStamp(),
+        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
+        proto.getHeader().getClientName(),
+        fromProto(proto.getSource()),
+        fromProtos(proto.getTargetsList()),
+        fromProto(proto.getHeader().getBaseHeader().getToken()));
+  }
+
+  /**
+   * Abstract OP_WRITE_BLOCK method. 
+   * Write a block.
+   */
+  protected abstract void opWriteBlock(DataInputStream in, ExtendedBlock blk,
+      int pipelineSize, BlockConstructionStage stage, long newGs,
+      long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
+      DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
+      throws IOException;
+
+  /** Receive {@link Op#TRANSFER_BLOCK} */
+  private void opTransferBlock(DataInputStream in) throws IOException {
+    final OpTransferBlockProto proto =
+      OpTransferBlockProto.parseFrom(vintPrefixed(in));
+
+    opTransferBlock(in,
+        fromProto(proto.getHeader().getBaseHeader().getBlock()),
+        proto.getHeader().getClientName(),
+        fromProtos(proto.getTargetsList()),
+        fromProto(proto.getHeader().getBaseHeader().getToken()));
+  }
+
+  /**
+   * Abstract {@link Op#TRANSFER_BLOCK} method.
+   * For {@link BlockConstructionStage#TRANSFER_RBW}
+   * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+   */
+  protected abstract void opTransferBlock(DataInputStream in, ExtendedBlock blk,
+      String client, DatanodeInfo[] targets,
+      Token<BlockTokenIdentifier> blockToken)
+      throws IOException;
+
+  /** Receive OP_REPLACE_BLOCK */
+  private void opReplaceBlock(DataInputStream in) throws IOException {
+    OpReplaceBlockProto proto = OpReplaceBlockProto.parseFrom(vintPrefixed(in));
+
+    opReplaceBlock(in,
+        fromProto(proto.getHeader().getBlock()),
+        proto.getDelHint(),
+        fromProto(proto.getSource()),
+        fromProto(proto.getHeader().getToken()));
+  }
+
+  /**
+   * Abstract OP_REPLACE_BLOCK method.
+   * It is used for balancing purpose; send to a destination
+   */
+  protected abstract void opReplaceBlock(DataInputStream in,
+      ExtendedBlock blk, String delHint, DatanodeInfo src,
+      Token<BlockTokenIdentifier> blockToken) throws IOException;
+
+  /** Receive OP_COPY_BLOCK */
+  private void opCopyBlock(DataInputStream in) throws IOException {
+    OpCopyBlockProto proto = OpCopyBlockProto.parseFrom(vintPrefixed(in));
+    
+    opCopyBlock(in,
+        fromProto(proto.getHeader().getBlock()),
+        fromProto(proto.getHeader().getToken()));
+  }
+
+  /**
+   * Abstract OP_COPY_BLOCK method. It is used for balancing purpose; send to
+   * a proxy source.
+   */
+  protected abstract void opCopyBlock(DataInputStream in, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken)
+      throws IOException;
+
+  /** Receive OP_BLOCK_CHECKSUM */
+  private void opBlockChecksum(DataInputStream in) throws IOException {
+    OpBlockChecksumProto proto = OpBlockChecksumProto.parseFrom(vintPrefixed(in));
+    
+    opBlockChecksum(in,
+        fromProto(proto.getHeader().getBlock()),
+        fromProto(proto.getHeader().getToken()));
+  }
+
+  /**
+   * Abstract OP_BLOCK_CHECKSUM method.
+   * Get the checksum of a block 
+   */
+  protected abstract void opBlockChecksum(DataInputStream in,
+      ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken)
+      throws IOException;
+}
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
new file mode 100644
index 0000000..3184554
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import org.apache.hadoop.HadoopIllegalArgumentException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+
+/**
+ * The setting of replace-datanode-on-failure feature.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public enum ReplaceDatanodeOnFailure {
+  /** The feature is disabled in the entire site. */
+  DISABLE,
+  /** Never add a new datanode. */
+  NEVER,
+  /**
+   * DEFAULT policy:
+   *   Let r be the replication number.
+   *   Let n be the number of existing datanodes.
+   *   Add a new datanode only if r >= 3 and either
+   *   (1) floor(r/2) >= n; or
+   *   (2) r > n and the block is hflushed/appended.
+   */
+  DEFAULT,
+  /** Always add a new datanode when an existing datanode is removed. */
+  ALWAYS;
+
+  /** Check if the feature is enabled. */
+  public void checkEnabled() {
+    if (this == DISABLE) {
+      throw new UnsupportedOperationException(
+          "This feature is disabled.  Please refer to "
+          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+          + " configuration property.");
+    }
+  }
+
+  /** Is the policy satisfied? */
+  public boolean satisfy(
+      final short replication, final DatanodeInfo[] existings,
+      final boolean isAppend, final boolean isHflushed) {
+    final int n = existings == null? 0: existings.length;
+    if (n == 0 || n >= replication) {
+      //don't need to add datanode for any policy.
+      return false;
+    } else if (this == DISABLE || this == NEVER) {
+      return false;
+    } else if (this == ALWAYS) {
+      return true;
+    } else {
+      //DEFAULT
+      if (replication < 3) {
+        return false;
+      } else {
+        if (n <= (replication/2)) {
+          return true;
+        } else {
+          return isAppend || isHflushed;
+        }
+      }
+    }
+  }
+
+  /** Get the setting from configuration. */
+  public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+    final boolean enabled = conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+    if (!enabled) {
+      return DISABLE;
+    }
+
+    final String policy = conf.get(
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+    for(int i = 1; i < values().length; i++) {
+      final ReplaceDatanodeOnFailure rdof = values()[i];
+      if (rdof.name().equalsIgnoreCase(policy)) {
+        return rdof;
+      }
+    }
+    throw new HadoopIllegalArgumentException("Illegal configuration value for "
+        + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+        + ": " + policy);
+  }
+
+  /** Write the setting to configuration. */
+  public void write(final Configuration conf) {
+    conf.setBoolean(
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+        this != DISABLE);
+    conf.set(
+        DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+        name());
+  }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
new file mode 100644
index 0000000..9502ad4
--- /dev/null
+++ b/src/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol.datatransfer;
+
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProto;
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.toProtos;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.toProto;
+
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.security.token.Token;
+
+import com.google.protobuf.Message;
+
+/** Sender */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class Sender {
+  /** Initialize a operation. */
+  private static void op(final DataOutput out, final Op op
+      ) throws IOException {
+    out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    op.write(out);
+  }
+
+  private static void send(final DataOutputStream out, final Op opcode,
+      final Message proto) throws IOException {
+    op(out, opcode);
+    proto.writeDelimitedTo(out);
+    out.flush();
+  }
+
+  /** Send OP_READ_BLOCK */
+  public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
+      long blockOffset, long blockLen, String clientName,
+      Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+
+    OpReadBlockProto proto = OpReadBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
+      .setOffset(blockOffset)
+      .setLen(blockLen)
+      .build();
+
+    send(out, Op.READ_BLOCK, proto);
+  }
+  
+
+  /** Send OP_WRITE_BLOCK */
+  public static void opWriteBlock(DataOutputStream out, ExtendedBlock blk,
+      int pipelineSize, BlockConstructionStage stage, long newGs,
+      long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
+      DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(blk, client,
+        blockToken);
+    
+    OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
+      .setHeader(header)
+      .addAllTargets(
+          toProtos(targets, 1))
+      .setStage(toProto(stage))
+      .setPipelineSize(pipelineSize)
+      .setMinBytesRcvd(minBytesRcvd)
+      .setMaxBytesRcvd(maxBytesRcvd)
+      .setLatestGenerationStamp(newGs);
+    
+    if (src != null) {
+      proto.setSource(toProto(src));
+    }
+
+    send(out, Op.WRITE_BLOCK, proto.build());
+  }
+
+  /** Send {@link Op#TRANSFER_BLOCK} */
+  public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
+      String client, DatanodeInfo[] targets,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
+    
+    OpTransferBlockProto proto = OpTransferBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildClientHeader(
+          blk, client, blockToken))
+      .addAllTargets(toProtos(targets, 0))
+      .build();
+
+    send(out, Op.TRANSFER_BLOCK, proto);
+  }
+
+  /** Send OP_REPLACE_BLOCK */
+  public static void opReplaceBlock(DataOutputStream out,
+      ExtendedBlock blk, String delHint, DatanodeInfo src,
+      Token<BlockTokenIdentifier> blockToken) throws IOException {
+    OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .setDelHint(delHint)
+      .setSource(toProto(src))
+      .build();
+    
+    send(out, Op.REPLACE_BLOCK, proto);
+  }
+
+  /** Send OP_COPY_BLOCK */
+  public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.COPY_BLOCK, proto);
+  }
+
+  /** Send OP_BLOCK_CHECKSUM */
+  public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
+      Token<BlockTokenIdentifier> blockToken)
+      throws IOException {
+    OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder()
+      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
+      .build();
+    
+    send(out, Op.BLOCK_CHECKSUM, proto);
+  }
+}
diff --git a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
index efaa80d..a5ecf85 100644
--- a/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
+++ b/src/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.balancer;
 
+import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
@@ -51,12 +53,11 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@@ -74,8 +75,6 @@
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
-import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
-
 /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
  * when some datanodes become full or when new empty nodes join the cluster.
  * The tool is deployed as an application program that can be run by the 
@@ -349,7 +348,7 @@
     private void sendRequest(DataOutputStream out) throws IOException {
       final ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock());
       final Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb);
-      DataTransferProtocol.Sender.opReplaceBlock(out, eb, source.getStorageID(), 
+      Sender.opReplaceBlock(out, eb, source.getStorageID(), 
           proxySource.getDatanode(), accessToken);
     }
     
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 0956cc5..b382323 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -34,13 +34,13 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.FSOutputSummer;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 2bc8086..55c08e1 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -33,7 +33,7 @@
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.SocketOutputStream;
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 09a87d4..58319ce 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -18,7 +18,32 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 
 import java.io.BufferedOutputStream;
@@ -66,16 +91,17 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
@@ -1947,7 +1973,7 @@
               EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE));
         }
 
-        DataTransferProtocol.Sender.opWriteBlock(out,
+        Sender.opWriteBlock(out,
             b, 0, stage, 0, 0, 0, clientname, srcNode, targets, accessToken);
 
         // send data & checksum
diff --git a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index 8b5fc08..098f151 100644
--- a/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ b/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -17,8 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.*;
-import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN;
+import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS;
 import static org.apache.hadoop.hdfs.server.common.Util.now;
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
@@ -38,16 +39,18 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
-
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
@@ -55,8 +58,6 @@
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.MD5Hash;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
@@ -69,8 +70,7 @@
 /**
  * Thread for processing incoming/outgoing data stream.
  */
-class DataXceiver extends DataTransferProtocol.Receiver
-    implements Runnable, FSConstants {
+class DataXceiver extends Receiver implements Runnable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -139,7 +139,7 @@
       // This optimistic behaviour allows the other end to reuse connections.
       // Setting keepalive timeout to 0 disable this behavior.
       do {
-        DataTransferProtocol.Op op;
+        Op op;
         try {
           if (opsProcessed != 0) {
             assert socketKeepaliveTimeout > 0;
@@ -206,8 +206,7 @@
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
     checkAccess(out, true, block, blockToken,
-        DataTransferProtocol.Op.READ_BLOCK,
-        BlockTokenSecretManager.AccessMode.READ);
+        Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
   
     // send the block
     BlockSender blockSender = null;
@@ -329,8 +328,7 @@
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
             SMALL_BUFFER_SIZE));
     checkAccess(replyOut, isClient, block, blockToken,
-        DataTransferProtocol.Op.WRITE_BLOCK,
-        BlockTokenSecretManager.AccessMode.WRITE);
+        Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
 
     DataOutputStream mirrorOut = null;  // stream to next target
     DataInputStream mirrorIn = null;    // reply from next target
@@ -375,7 +373,7 @@
                          SMALL_BUFFER_SIZE));
           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
 
-          DataTransferProtocol.Sender.opWriteBlock(mirrorOut, originalBlock,
+          Sender.opWriteBlock(mirrorOut, originalBlock,
               pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, clientname,
               srcDataNode, targets, blockToken);
 
@@ -497,10 +495,9 @@
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
     checkAccess(null, true, blk, blockToken,
-        DataTransferProtocol.Op.TRANSFER_BLOCK,
-        BlockTokenSecretManager.AccessMode.COPY);
+        Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
 
-    updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+    updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + blk);
 
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
@@ -521,8 +518,7 @@
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     checkAccess(out, true, block, blockToken,
-        DataTransferProtocol.Op.BLOCK_CHECKSUM,
-        BlockTokenSecretManager.AccessMode.READ);
+        Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
     updateCurrentThreadName("Reading metadata for block " + block);
     final MetaDataInputStream metadataIn = 
       datanode.data.getMetaDataInputStream(block);
@@ -693,7 +689,7 @@
                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
 
       /* send request to the proxy */
-      DataTransferProtocol.Sender.opCopyBlock(proxyOut, block, blockToken);
+      Sender.opCopyBlock(proxyOut, block, blockToken);
 
       // receive the response from the proxy
       proxyReply = new DataInputStream(new BufferedInputStream(
@@ -760,11 +756,6 @@
     return now() - opStartTime;
   }
 
-  private void updateCounter(MutableCounterLong localCounter,
-      MutableCounterLong remoteCounter) {
-    (isLocal? localCounter: remoteCounter).incr();
-  }
-
   /**
    * Utility function for sending a response.
    * @param s socket to write to
@@ -793,7 +784,7 @@
   private void checkAccess(DataOutputStream out, final boolean reply, 
       final ExtendedBlock blk,
       final Token<BlockTokenIdentifier> t,
-      final DataTransferProtocol.Op op,
+      final Op op,
       final BlockTokenSecretManager.AccessMode mode) throws IOException {
     if (datanode.isBlockTokenEnabled) {
       try {
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index d0a2afe..4f1b645 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -17,45 +17,100 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.commons.logging.*;
+import static org.apache.hadoop.hdfs.server.common.Util.now;
 
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.management.ManagementFactory;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.InvalidPathException;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.common.Util;
-import static org.apache.hadoop.hdfs.server.common.Util.now;
-import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.delegation.DelegationKey;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
-import org.apache.hadoop.util.*;
-import org.apache.hadoop.net.CachedDNSToSwitchMapping;
-import org.apache.hadoop.net.DNSToSwitchMapping;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.net.NodeBase;
-import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
 import org.apache.hadoop.hdfs.server.namenode.UnderReplicatedBlocks.BlockIterator;
+import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
+import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -65,51 +120,33 @@
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FsServerDefaults;
-import org.apache.hadoop.fs.InvalidPathException;
-import org.apache.hadoop.fs.ParentNotDirectoryException;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Options.Rename;
-import org.apache.hadoop.fs.permission.*;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.net.CachedDNSToSwitchMapping;
+import org.apache.hadoop.net.DNSToSwitchMapping;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.net.ScriptBasedMapping;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import java.io.BufferedWriter;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.DataOutputStream;
-import java.io.PrintWriter;
-import java.lang.management.ManagementFactory;
-import java.net.InetAddress;
-import java.net.URI;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.Map.Entry;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
 /***************************************************
  * FSNamesystem does the actual bookkeeping work for the
  * DataNode.
@@ -270,8 +307,8 @@
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
-  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
-      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+  private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -552,7 +589,7 @@
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
 
-    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+    this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
index c3bd393..1b4fea3 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
@@ -32,7 +32,7 @@
 import org.apache.hadoop.hdfs.PipelinesTestUtil.PipelinesTest;
 import org.apache.hadoop.hdfs.PipelinesTestUtil.NodeBytes;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
index 21ff0ae..9d4f189 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
@@ -27,8 +27,8 @@
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Receiver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 
 /** Aspect for DataTransferProtocol */
 public aspect DataTransferProtocolAspects {
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
index ed9de6d..b7e6277 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
@@ -36,7 +36,7 @@
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.log4j.Level;
 import org.junit.Assert;
diff --git a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
index 8a25b08..dcfdcf9 100644
--- a/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
+++ b/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
@@ -37,7 +37,7 @@
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
index 57fdc43..02bbf5f 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
@@ -52,11 +54,11 @@
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
@@ -72,8 +74,6 @@
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 
-import static org.junit.Assert.*;
-
 /** Utilities for HDFS tests */
 public class DFSTestUtil {
   
@@ -674,7 +674,7 @@
     final DataInputStream in = new DataInputStream(NetUtils.getInputStream(s));
 
     // send the request
-    DataTransferProtocol.Sender.opTransferBlock(out, b, dfsClient.clientName,
+    Sender.opTransferBlock(out, b, dfsClient.clientName,
         new DatanodeInfo[]{datanodes[1]}, new Token<BlockTokenIdentifier>());
     out.flush();
 
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
index 63d1974..9133002 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
@@ -17,10 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.WRITE_BLOCK;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -42,22 +38,23 @@
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
@@ -188,7 +185,7 @@
       String description, Boolean eofExcepted) throws IOException {
     sendBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, block, 0,
+    Sender.opWriteBlock(sendOut, block, 0,
         stage, newGS, block.getNumBytes(), block.getNumBytes(), "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
     if (eofExcepted) {
@@ -370,12 +367,12 @@
     // bad ops
     sendBuf.reset();
     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
-    sendOut.writeByte(WRITE_BLOCK.code - 1);
+    sendOut.writeByte(Op.WRITE_BLOCK.code - 1);
     sendRecvData("Wrong Op Code", true);
     
     /* Test OP_WRITE_BLOCK */
     sendBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+    Sender.opWriteBlock(sendOut, 
         new ExtendedBlock(poolId, newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -389,7 +386,7 @@
 
     sendBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut,
+    Sender.opWriteBlock(sendOut,
         new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -412,7 +409,7 @@
     // test for writing a valid zero size block
     sendBuf.reset();
     recvBuf.reset();
-    DataTransferProtocol.Sender.opWriteBlock(sendOut, 
+    Sender.opWriteBlock(sendOut, 
         new ExtendedBlock(poolId, ++newBlockId), 0,
         BlockConstructionStage.PIPELINE_SETUP_CREATE, 0L, 0L, 0L, "cl", null,
         new DatanodeInfo[1], BlockTokenSecretManager.DUMMY_TOKEN);
@@ -442,21 +439,21 @@
     sendBuf.reset();
     recvBuf.reset();
     blk.setBlockId(blkid-1);
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
+    Sender.opReadBlock(sendOut, blk, 0L, fileLen, "cl",
           BlockTokenSecretManager.DUMMY_TOKEN);
     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
 
     // negative block start offset -1L
     sendBuf.reset();
     blk.setBlockId(blkid);
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
+    Sender.opReadBlock(sendOut, blk, -1L, fileLen, "cl",
           BlockTokenSecretManager.DUMMY_TOKEN);
     sendRecvData("Negative start-offset for read for block " + 
                  firstBlock.getBlockId(), false);
 
     // bad block start offset
     sendBuf.reset();
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
+    Sender.opReadBlock(sendOut, blk, fileLen, fileLen, "cl",
           BlockTokenSecretManager.DUMMY_TOKEN);
     sendRecvData("Wrong start-offset for reading block " +
                  firstBlock.getBlockId(), false);
@@ -465,7 +462,7 @@
     recvBuf.reset();
     sendResponse(Status.SUCCESS, null, recvOut);
     sendBuf.reset();
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, 
+    Sender.opReadBlock(sendOut, blk, 0L, 
         -1 - random.nextInt(oneMil), "cl", BlockTokenSecretManager.DUMMY_TOKEN);
     sendRecvData("Negative length for reading block " +
                  firstBlock.getBlockId(), false);
@@ -474,14 +471,14 @@
     recvBuf.reset();
     sendResponse(Status.ERROR, null, recvOut);
     sendBuf.reset();
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, 
+    Sender.opReadBlock(sendOut, blk, 0L, 
         fileLen + 1, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
     sendRecvData("Wrong length for reading block " +
                  firstBlock.getBlockId(), false);
     
     //At the end of all this, read the file to make sure that succeeds finally.
     sendBuf.reset();
-    DataTransferProtocol.Sender.opReadBlock(sendOut, blk, 0L, 
+    Sender.opReadBlock(sendOut, blk, 0L, 
         fileLen, "cl", BlockTokenSecretManager.DUMMY_TOKEN);
     readFile(fileSys, file, fileLen);
     } finally {
@@ -512,14 +509,6 @@
     readBack.readFields(ByteBuffer.wrap(baos.toByteArray()));
     assertEquals(hdr, readBack);
 
-    // Test sanity check for good header
-    PacketHeader goodHeader = new PacketHeader(
-      4,                   // size of packet
-      0,                   // OffsetInBlock
-      100,                 // sequencenumber
-      true,                // lastPacketInBlock
-      0);                  // chunk length
-
     assertTrue(hdr.sanityCheck(99));
     assertFalse(hdr.sanityCheck(100));
   }
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java b/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
index a366bf7..2aa33da 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
@@ -26,9 +26,10 @@
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
 import org.junit.Assert;
@@ -53,8 +54,7 @@
   /** Test DEFAULT ReplaceDatanodeOnFailure policy. */
   @Test
   public void testDefaultPolicy() throws Exception {
-    final DataTransferProtocol.ReplaceDatanodeOnFailure p
-        = DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+    final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.DEFAULT;
 
     final DatanodeInfo[] infos = new DatanodeInfo[5];
     final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
@@ -113,7 +113,7 @@
     final Configuration conf = new HdfsConfiguration();
     
     //always replace a datanode
-    DataTransferProtocol.ReplaceDatanodeOnFailure.ALWAYS.write(conf);
+    ReplaceDatanodeOnFailure.ALWAYS.write(conf);
 
     final String[] racks = new String[REPLICATION];
     Arrays.fill(racks, RACK0);
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
index 52ffa92..e6349ce 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java
@@ -36,21 +36,21 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.net.NetUtils;
 /**
  * This class tests if block replacement request to data nodes work correctly.
@@ -258,7 +258,7 @@
     sock.setKeepAlive(true);
     // sendRequest
     DataOutputStream out = new DataOutputStream(sock.getOutputStream());
-    DataTransferProtocol.Sender.opReplaceBlock(out, block, source
+    Sender.opReplaceBlock(out, block, source
         .getStorageID(), sourceProxy, BlockTokenSecretManager.DUMMY_TOKEN);
     out.flush();
     // receiveResponse
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
index 809481b..ed862b8 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.DataOutputStream;
 import java.io.File;
 import java.net.InetSocketAddress;
@@ -26,22 +29,20 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Sender;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
-import org.junit.Test;
-import org.junit.Before;
 import org.junit.After;
-import static org.junit.Assert.*;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
  * Test that datanodes can correctly handle errors during block read/write.
diff --git a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
index 6bc92a1..07c35e7 100644
--- a/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
+++ b/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
@@ -29,10 +29,10 @@
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;