HDFS-1966.  Encapsulate individual DataTransferProtocol op headers.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/trunk@1130367 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 90f1882..891afb9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -468,6 +468,9 @@
     HDFS-1636. If dfs.name.dir points to an empty dir, namenode format
     shouldn't require confirmation. (Harsh J Chouraria via todd)
 
+    HDFS-1966.  Encapsulate individual DataTransferProtocol op headers.
+    (szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
diff --git a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
index 091b7a8..9923c1f 100644
--- a/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
+++ b/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
@@ -51,10 +51,10 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 24:
-   *    Remove deprecated fields.
+   * Version 25:
+   *    Encapsulate individual operation headers.
    */
-  public static final int DATA_TRANSFER_VERSION = 24;
+  public static final int DATA_TRANSFER_VERSION = 25;
 
   /** Operation */
   public enum Op {
@@ -89,7 +89,332 @@
     public void write(DataOutput out) throws IOException {
       out.write(code);
     }
-  };
+
+    /** Base class for all headers. */
+    private static abstract class BaseHeader implements Writable {
+      private ExtendedBlock block;
+      private Token<BlockTokenIdentifier> blockToken;
+      
+      private BaseHeader() {}
+      
+      private BaseHeader(
+          final ExtendedBlock block,
+          final Token<BlockTokenIdentifier> blockToken) {
+        this.block = block;
+        this.blockToken = blockToken;
+      }
+
+      /** @return the extended block. */
+      public final ExtendedBlock getBlock() {
+        return block;
+      }
+
+      /** @return the block token. */
+      public final Token<BlockTokenIdentifier> getBlockToken() {
+        return blockToken;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        block.writeId(out);
+        blockToken.write(out);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        block = new ExtendedBlock();
+        block.readId(in);
+
+        blockToken = new Token<BlockTokenIdentifier>();
+        blockToken.readFields(in);
+      }
+    }
+
+    /** Base header for all client operation. */
+    private static abstract class ClientOperationHeader extends BaseHeader {
+      private String clientName;
+      
+      private ClientOperationHeader() {}
+      
+      private ClientOperationHeader(
+          final ExtendedBlock block,
+          final Token<BlockTokenIdentifier> blockToken,
+          final String clientName) {
+        super(block, blockToken);
+        this.clientName = clientName;
+      }
+
+      /** @return client name. */
+      public final String getClientName() {
+        return clientName;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, clientName);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        clientName = Text.readString(in);
+      }
+    }
+
+    /** {@link Op#READ_BLOCK} header. */
+    public static class ReadBlockHeader extends ClientOperationHeader {
+      private long offset;
+      private long length;
+
+      /** Default constructor */
+      public ReadBlockHeader() {}
+
+      /** Constructor with all parameters */
+      public ReadBlockHeader(
+          final ExtendedBlock blk,
+          final Token<BlockTokenIdentifier> blockToken,
+          final String clientName,
+          final long offset,
+          final long length) {
+        super(blk, blockToken, clientName);
+        this.offset = offset;
+        this.length = length;
+      }
+
+      /** @return the offset */
+      public long getOffset() {
+        return offset;
+      }
+
+      /** @return the length */
+      public long getLength() {
+        return length;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        super.write(out);
+        out.writeLong(offset);
+        out.writeLong(length);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        offset = in.readLong();
+        length = in.readLong();
+      }
+    }
+
+    /** {@link Op#WRITE_BLOCK} header. */
+    public static class WriteBlockHeader extends ClientOperationHeader {
+      private DatanodeInfo[] targets;
+
+      private DatanodeInfo source;
+      private BlockConstructionStage stage;
+      private int pipelineSize;
+      private long minBytesRcvd;
+      private long maxBytesRcvd;
+      private long latestGenerationStamp;
+      
+      /** Default constructor */
+      public WriteBlockHeader() {}
+
+      /** Constructor with all parameters */
+      public WriteBlockHeader(
+          final ExtendedBlock blk,
+          final Token<BlockTokenIdentifier> blockToken,
+          final String clientName,
+          final DatanodeInfo[] targets,
+          final DatanodeInfo source,
+          final BlockConstructionStage stage,
+          final int pipelineSize,
+          final long minBytesRcvd,
+          final long maxBytesRcvd,
+          final long latestGenerationStamp
+          ) throws IOException {
+        super(blk, blockToken, clientName);
+        this.targets = targets;
+        this.source = source;
+        this.stage = stage;
+        this.pipelineSize = pipelineSize;
+        this.minBytesRcvd = minBytesRcvd;
+        this.maxBytesRcvd = maxBytesRcvd;
+        this.latestGenerationStamp = latestGenerationStamp;
+      }
+
+      /** @return targets. */
+      public DatanodeInfo[] getTargets() {
+        return targets;
+      }
+
+      /** @return the source */
+      public DatanodeInfo getSource() {
+        return source;
+      }
+
+      /** @return the stage */
+      public BlockConstructionStage getStage() {
+        return stage;
+      }
+
+      /** @return the pipeline size */
+      public int getPipelineSize() {
+        return pipelineSize;
+      }
+
+      /** @return the minimum bytes received. */
+      public long getMinBytesRcvd() {
+        return minBytesRcvd;
+      }
+
+      /** @return the maximum bytes received. */
+      public long getMaxBytesRcvd() {
+        return maxBytesRcvd;
+      }
+
+      /** @return the latest generation stamp */
+      public long getLatestGenerationStamp() {
+        return latestGenerationStamp;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Sender.write(out, 1, targets);
+
+        out.writeBoolean(source != null);
+        if (source != null) {
+          source.write(out);
+        }
+
+        stage.write(out);
+        out.writeInt(pipelineSize);
+        WritableUtils.writeVLong(out, minBytesRcvd);
+        WritableUtils.writeVLong(out, maxBytesRcvd);
+        WritableUtils.writeVLong(out, latestGenerationStamp);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        targets = Receiver.readDatanodeInfos(in);
+
+        source = in.readBoolean()? DatanodeInfo.read(in): null;
+        stage = BlockConstructionStage.readFields(in);
+        pipelineSize = in.readInt(); // num of datanodes in entire pipeline
+        minBytesRcvd = WritableUtils.readVLong(in);
+        maxBytesRcvd = WritableUtils.readVLong(in);
+        latestGenerationStamp = WritableUtils.readVLong(in);
+      }
+    }
+
+    /** {@link Op#TRANSFER_BLOCK} header. */
+    public static class TransferBlockHeader extends ClientOperationHeader {
+      private DatanodeInfo[] targets;
+
+      /** Default constructor */
+      public TransferBlockHeader() {}
+
+      /** Constructor with all parameters */
+      public TransferBlockHeader(
+          final ExtendedBlock blk,
+          final Token<BlockTokenIdentifier> blockToken,
+          final String clientName,
+          final DatanodeInfo[] targets) throws IOException {
+        super(blk, blockToken, clientName);
+        this.targets = targets;
+      }
+
+      /** @return targets. */
+      public DatanodeInfo[] getTargets() {
+        return targets;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Sender.write(out, 0, targets);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        targets = Receiver.readDatanodeInfos(in);
+      }
+    }
+
+    /** {@link Op#REPLACE_BLOCK} header. */
+    public static class ReplaceBlockHeader extends BaseHeader {
+      private String delHint;
+      private DatanodeInfo source;
+
+      /** Default constructor */
+      public ReplaceBlockHeader() {}
+
+      /** Constructor with all parameters */
+      public ReplaceBlockHeader(final ExtendedBlock blk,
+          final Token<BlockTokenIdentifier> blockToken,
+          final String storageId,
+          final DatanodeInfo src) throws IOException {
+        super(blk, blockToken);
+        this.delHint = storageId;
+        this.source = src;
+      }
+
+      /** @return delete-hint. */
+      public String getDelHint() {
+        return delHint;
+      }
+
+      /** @return source datanode. */
+      public DatanodeInfo getSource() {
+        return source;
+      }
+
+      @Override
+      public void write(DataOutput out) throws IOException {
+        super.write(out);
+        Text.writeString(out, delHint);
+        source.write(out);
+      }
+
+      @Override
+      public void readFields(DataInput in) throws IOException {
+        super.readFields(in);
+        delHint = Text.readString(in);
+        source = DatanodeInfo.read(in);
+      }
+    }
+
+    /** {@link Op#COPY_BLOCK} header. */
+    public static class CopyBlockHeader extends BaseHeader {
+      /** Default constructor */
+      public CopyBlockHeader() {}
+
+      /** Constructor with all parameters */
+      public CopyBlockHeader(
+          final ExtendedBlock block,
+          final Token<BlockTokenIdentifier> blockToken) {
+        super(block, blockToken);
+      }
+    }
+
+    /** {@link Op#BLOCK_CHECKSUM} header. */
+    public static class BlockChecksumHeader extends BaseHeader {
+      /** Default constructor */
+      public BlockChecksumHeader() {}
+
+      /** Constructor with all parameters */
+      public BlockChecksumHeader(
+          final ExtendedBlock block,
+          final Token<BlockTokenIdentifier> blockToken) {
+        super(block, blockToken);
+      }
+    }
+  }
+
 
   /** Status */
   public enum Status {
@@ -189,24 +514,27 @@
   @InterfaceStability.Evolving
   public static class Sender {
     /** Initialize a operation. */
-    public static void op(DataOutputStream out, Op op) throws IOException {
+    private static void op(final DataOutput out, final Op op
+        ) throws IOException {
       out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
       op.write(out);
     }
 
+    /** Send an operation request. */
+    private static void send(final DataOutputStream out, final Op opcode,
+        final Op.BaseHeader parameters) throws IOException {
+      op(out, opcode);
+      parameters.write(out);
+      out.flush();
+    }
+
     /** Send OP_READ_BLOCK */
     public static void opReadBlock(DataOutputStream out, ExtendedBlock blk,
         long blockOffset, long blockLen, String clientName,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      op(out, Op.READ_BLOCK);
-
-      blk.writeId(out);
-      out.writeLong(blockOffset);
-      out.writeLong(blockLen);
-      Text.writeString(out, clientName);
-      blockToken.write(out);
-      out.flush();
+      send(out, Op.READ_BLOCK, new Op.ReadBlockHeader(blk, blockToken,
+          clientName, blockOffset, blockLen));
     }
     
     /** Send OP_WRITE_BLOCK */
@@ -215,74 +543,43 @@
         long minBytesRcvd, long maxBytesRcvd, String client, DatanodeInfo src,
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      op(out, Op.WRITE_BLOCK);
-
-      blk.writeId(out);
-      out.writeInt(pipelineSize);
-      stage.write(out);
-      WritableUtils.writeVLong(out, newGs);
-      WritableUtils.writeVLong(out, minBytesRcvd);
-      WritableUtils.writeVLong(out, maxBytesRcvd);
-      Text.writeString(out, client);
-
-      out.writeBoolean(src != null);
-      if (src != null) {
-        src.write(out);
-      }
-      write(out, 1, targets);
-      blockToken.write(out);
+      send(out, Op.WRITE_BLOCK, new Op.WriteBlockHeader(blk, blockToken,
+          client, targets, src, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
+          newGs));
     }
 
     /** Send {@link Op#TRANSFER_BLOCK} */
     public static void opTransferBlock(DataOutputStream out, ExtendedBlock blk,
         String client, DatanodeInfo[] targets,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
-      op(out, Op.TRANSFER_BLOCK);
-
-      blk.writeId(out);
-      Text.writeString(out, client);
-      write(out, 0, targets);
-      blockToken.write(out);
-      out.flush();
+      send(out, Op.TRANSFER_BLOCK, new Op.TransferBlockHeader(blk, blockToken,
+          client, targets));
     }
 
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
-        ExtendedBlock blk, String storageId, DatanodeInfo src,
+        ExtendedBlock blk, String delHint, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException {
-      op(out, Op.REPLACE_BLOCK);
-
-      blk.writeId(out);
-      Text.writeString(out, storageId);
-      src.write(out);
-      blockToken.write(out);
-      out.flush();
+      send(out, Op.REPLACE_BLOCK, new Op.ReplaceBlockHeader(blk, blockToken,
+          delHint, src));
     }
 
     /** Send OP_COPY_BLOCK */
     public static void opCopyBlock(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      op(out, Op.COPY_BLOCK);
-
-      blk.writeId(out);
-      blockToken.write(out);
-      out.flush();
+      send(out, Op.COPY_BLOCK, new Op.CopyBlockHeader(blk, blockToken));
     }
 
     /** Send OP_BLOCK_CHECKSUM */
     public static void opBlockChecksum(DataOutputStream out, ExtendedBlock blk,
         Token<BlockTokenIdentifier> blockToken)
         throws IOException {
-      op(out, Op.BLOCK_CHECKSUM);
-      
-      blk.writeId(out);
-      blockToken.write(out);
-      out.flush();
+      send(out, Op.BLOCK_CHECKSUM, new Op.BlockChecksumHeader(blk, blockToken));
     }
 
     /** Write an array of {@link DatanodeInfo} */
-    private static void write(final DataOutputStream out,
+    private static void write(final DataOutput out,
         final int start, 
         final DatanodeInfo[] datanodeinfos) throws IOException {
       out.writeInt(datanodeinfos.length - start);
@@ -334,14 +631,10 @@
 
     /** Receive OP_READ_BLOCK */
     private void opReadBlock(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final long offset = in.readLong();
-      final long length = in.readLong();
-      final String client = Text.readString(in);
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opReadBlock(in, blk, offset, length, client, blockToken);
+      final Op.ReadBlockHeader h = new Op.ReadBlockHeader();
+      h.readFields(in);
+      opReadBlock(in, h.getBlock(), h.getOffset(), h.getLength(),
+          h.getClientName(), h.getBlockToken());
     }
 
     /**
@@ -353,22 +646,12 @@
     
     /** Receive OP_WRITE_BLOCK */
     private void opWriteBlock(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-      final BlockConstructionStage stage = 
-        BlockConstructionStage.readFields(in);
-      final long newGs = WritableUtils.readVLong(in);
-      final long minBytesRcvd = WritableUtils.readVLong(in);
-      final long maxBytesRcvd = WritableUtils.readVLong(in);
-      final String client = Text.readString(in); // working on behalf of this client
-      final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
-
-      final DatanodeInfo targets[] = readDatanodeInfos(in);
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opWriteBlock(in, blk, pipelineSize, stage,
-          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, blockToken);
+      final Op.WriteBlockHeader h = new Op.WriteBlockHeader();
+      h.readFields(in);
+      opWriteBlock(in, h.getBlock(), h.getPipelineSize(), h.getStage(),
+          h.getLatestGenerationStamp(),
+          h.getMinBytesRcvd(), h.getMaxBytesRcvd(),
+          h.getClientName(), h.getSource(), h.getTargets(), h.getBlockToken());
     }
 
     /**
@@ -383,13 +666,10 @@
 
     /** Receive {@link Op#TRANSFER_BLOCK} */
     private void opTransferBlock(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final String client = Text.readString(in);
-      final DatanodeInfo targets[] = readDatanodeInfos(in);
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opTransferBlock(in, blk, client, targets, blockToken);
+      final Op.TransferBlockHeader h = new Op.TransferBlockHeader();
+      h.readFields(in);
+      opTransferBlock(in, h.getBlock(), h.getClientName(), h.getTargets(),
+          h.getBlockToken());
     }
 
     /**
@@ -404,13 +684,10 @@
 
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final String sourceId = Text.readString(in); // read del hint
-      final DatanodeInfo src = DatanodeInfo.read(in); // read proxy source
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opReplaceBlock(in, blk, sourceId, src, blockToken);
+      final Op.ReplaceBlockHeader h = new Op.ReplaceBlockHeader();
+      h.readFields(in);
+      opReplaceBlock(in, h.getBlock(), h.getDelHint(), h.getSource(),
+          h.getBlockToken());
     }
 
     /**
@@ -418,16 +695,14 @@
      * It is used for balancing purpose; send to a destination
      */
     protected abstract void opReplaceBlock(DataInputStream in,
-        ExtendedBlock blk, String sourceId, DatanodeInfo src,
+        ExtendedBlock blk, String delHint, DatanodeInfo src,
         Token<BlockTokenIdentifier> blockToken) throws IOException;
 
     /** Receive OP_COPY_BLOCK */
     private void opCopyBlock(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opCopyBlock(in, blk, blockToken);
+      final Op.CopyBlockHeader h = new Op.CopyBlockHeader();
+      h.readFields(in);
+      opCopyBlock(in, h.getBlock(), h.getBlockToken());
     }
 
     /**
@@ -440,11 +715,9 @@
 
     /** Receive OP_BLOCK_CHECKSUM */
     private void opBlockChecksum(DataInputStream in) throws IOException {
-      final ExtendedBlock blk = new ExtendedBlock();
-      blk.readId(in);
-      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
-
-      opBlockChecksum(in, blk, blockToken);
+      final Op.BlockChecksumHeader h = new Op.BlockChecksumHeader();
+      h.readFields(in);
+      opBlockChecksum(in, h.getBlock(), h.getBlockToken());
     }
 
     /**
@@ -456,7 +729,7 @@
         throws IOException;
 
     /** Read an array of {@link DatanodeInfo} */
-    private static DatanodeInfo[] readDatanodeInfos(final DataInputStream in
+    private static DatanodeInfo[] readDatanodeInfos(final DataInput in
         ) throws IOException {
       final int n = in.readInt();
       if (n < 0) {
@@ -469,14 +742,6 @@
       }
       return datanodeinfos;
     }
-
-    /** Read an AccessToken */
-    static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
-        ) throws IOException {
-      final Token<BlockTokenIdentifier> t = new Token<BlockTokenIdentifier>();
-      t.readFields(in);
-      return t; 
-    }
   }
   
   /** reply **/