Update FSEditLogOp to include LOG_SEGMENT ops after merge with trunk and HDFS-2003.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hdfs/branches/HDFS-1073@1134161 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index 355a056..a3429a3 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -152,8 +152,7 @@
           recentOpcodeOffsets[numEdits % recentOpcodeOffsets.length] =
               tracker.getPos();
           if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
-            // Read the txid
-            long thisTxId = in.readLong();
+            long thisTxId = op.txid;
             if (thisTxId != txId + 1) {
               throw new IOException("Expected transaction ID " +
                   (txId + 1) + " but got " + thisTxId);
diff --git a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
index db99d11..cc1bcca 100644
--- a/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
+++ b/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
@@ -55,6 +55,8 @@
 @InterfaceStability.Unstable
 public abstract class FSEditLogOp {
   final FSEditLogOpCodes opCode;
+  long txid;
+
 
   /**
    * Constructor for an EditLog Op. EditLog ops cannot be constructed
@@ -62,6 +64,11 @@
    */
   private FSEditLogOp(FSEditLogOpCodes opCode) {
     this.opCode = opCode;
+    this.txid = 0;
+  }
+
+  public void setTransactionId(long txid) {
+    this.txid = txid;
   }
 
   public abstract void readFields(DataInputStream in, int logVersion)
@@ -577,6 +584,19 @@
     }
   }
   
+  static class LogSegmentOp extends FSEditLogOp {
+    private LogSegmentOp(FSEditLogOpCodes code) {
+      super(code);
+      assert code == OP_START_LOG_SEGMENT ||
+             code == OP_END_LOG_SEGMENT : "Bad op: " + code;
+    }
+
+    public void readFields(DataInputStream in, int logVersion)
+        throws IOException {
+      // no data stored in these ops yet
+    }
+  }
+
   static private short readShort(DataInputStream in) throws IOException {
     return Short.parseShort(FSImageSerialization.readString(in));
   }
@@ -677,6 +697,10 @@
       opInstances.put(OP_CANCEL_DELEGATION_TOKEN,
                       new CancelDelegationTokenOp());
       opInstances.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp());
+      opInstances.put(OP_START_LOG_SEGMENT,
+                      new LogSegmentOp(OP_START_LOG_SEGMENT));
+      opInstances.put(OP_END_LOG_SEGMENT,
+                      new LogSegmentOp(OP_END_LOG_SEGMENT));
     }
 
     /**
@@ -713,9 +737,15 @@
       if (op == null) {
         throw new IOException("Read invalid opcode " + opCode);
       }
+
+      if (LayoutVersion.supports(Feature.STORED_TXIDS, logVersion)) {
+        // Read the txid
+        op.setTransactionId(in.readLong());
+      }
+
       op.readFields(in, logVersion);
 
-      validateChecksum(in, checksum);
+      validateChecksum(in, checksum, op.txid);
       return op;
     }
 
@@ -723,7 +753,8 @@
      * Validate a transaction's checksum
      */
     private void validateChecksum(DataInputStream in,
-                                  Checksum checksum)
+                                  Checksum checksum,
+                                  long txid)
         throws IOException {
       if (checksum != null) {
         int calculatedChecksum = (int)checksum.getValue();
@@ -731,7 +762,7 @@
         if (readChecksum != calculatedChecksum) {
           throw new ChecksumException(
               "Transaction is corrupt. Calculated checksum is " +
-              calculatedChecksum + " but read checksum " + readChecksum, -1);
+              calculatedChecksum + " but read checksum " + readChecksum, txid);
         }
       }
     }