HDFS-14989. Add a 'swapBlockList' operation to Namenode. (#1819)

* HDFS-14989. Add a 'swapBlockList' operation to Namenode.

* HDFS-14989. Add a 'swapBlockList' operation to Namenode. (Fix checkstyle issues)

* HDFS-14989. Address review comments.

* HDFS-14989. Swap storage policy ID, check destination file genStamp.

* HDFS-14989. Remove unused import.

* HDFS-14989. Address review comment.

* HDFS-14989. Use FSDirectory.resolveLastINode.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 79e4da4..cd3ec88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -112,6 +112,7 @@
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.server.common.ECTopologyVerifier;
+import org.apache.hadoop.hdfs.server.namenode.SwapBlockListOp.SwapBlockListResult;
 import org.apache.hadoop.hdfs.server.namenode.metrics.ReplicatedBlocksMBean;
 import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
 import org.apache.hadoop.ipc.ObserverRetryOnActiveException;
@@ -8498,5 +8499,35 @@
       throw new UnsupportedActionException(operationName + " not supported.");
     }
   }
+
+  /**
+   * Namesystem API to swap block list between source and destination files.
+   *
+   * @param src source file.
+   * @param dst destination file.
+   * @throws IOException on Error.
+   */
+  boolean swapBlockList(final String src, final String dst, long genTimestamp)
+      throws IOException {
+    final String operationName = "swapBlockList";
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = getPermissionChecker();
+    SwapBlockListResult res = null;
+    try {
+      writeLock();
+      try {
+        checkOperation(OperationCategory.WRITE);
+        checkNameNodeSafeMode("Cannot swap block list." + src + ", " + dst);
+        res = SwapBlockListOp.swapBlocks(dir, pc, src, dst, genTimestamp);
+      } finally {
+        writeUnlock(operationName);
+      }
+    } catch (AccessControlException e) {
+      logAuditEvent(false, operationName, src, dst, null);
+      throw e;
+    }
+    logAuditEvent(true, operationName, src, dst, res.getDstFileAuditStat());
+    return res.isSuccess();
+  }
 }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 67c86b3..43a98d5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -163,6 +163,10 @@
       return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
     }
 
+    static byte getBlockLayoutPolicy(long header) {
+      return (byte)BLOCK_LAYOUT_AND_REDUNDANCY.BITS.retrieve(header);
+    }
+
     // Union of all the block type masks. Currently there is only
     // BLOCK_TYPE_MASK_STRIPED
     static final long BLOCK_TYPE_MASK = 1 << 11;
@@ -728,6 +732,17 @@
     this.blocks = BlockInfo.EMPTY_ARRAY;
   }
 
+  /**
+   * This method replaces blocks in a file with the supplied blocks.
+   * @param newBlocks List of new blocks.
+   */
+  void replaceBlocks(BlockInfo[] newBlocks) {
+    this.blocks = Arrays.copyOf(newBlocks, newBlocks.length);
+    for (BlockInfo block : blocks) {
+      block.setBlockCollectionId(getId());
+    }
+  }
+
   private void updateRemovedUnderConstructionFiles(
       ReclaimContext reclaimContext) {
     if (isUnderConstruction() && reclaimContext.removedUCFiles != null) {
@@ -1257,4 +1272,17 @@
     return snapshotBlocks != null &&
         Arrays.asList(snapshotBlocks).contains(block);
   }
+
+  /**
+   * Update Header with new Block Layout and Redundancy bits.
+   * @param newBlockLayoutPolicy new block layout policy.
+   * @param newStoragePolicy new storage policy ID.
+   */
+  void updateHeaderWithNewPolicy(byte newBlockLayoutPolicy,
+                                 byte newStoragePolicy) {
+    this.header = HeaderFormat.toLong(
+        HeaderFormat.getPreferredBlockSize(header),
+        newBlockLayoutPolicy,
+        newStoragePolicy);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index e8dace9..7e480c5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -2667,4 +2667,14 @@
     }
     return namesystem.getBlockManager().getSPSManager().getNextPathId();
   }
+
+  public boolean swapBlockList(String src, String dst, long maxTimestamp)
+      throws IOException {
+    checkNNStartup();
+    if (stateChangeLog.isDebugEnabled()) {
+      stateChangeLog.debug("*DIR* NameNode.swapBlockList: {} and {}", src, dst);
+    }
+    return namesystem.swapBlockList(src, dst, maxTimestamp);
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java
new file mode 100644
index 0000000..7c02fbb
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SwapBlockListOp.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory.DirOp;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
+import org.apache.hadoop.util.Time;
+
+/**
+ * Class to carry out the operation of swapping blocks from one file to another.
+ * Along with swapping blocks, we can also optionally swap the block layout
+ * of a file header, which is useful for client operations like converting
+ * replicated to EC file.
+ */
+public final class SwapBlockListOp {
+
+  private SwapBlockListOp() {
+  }
+
+  static SwapBlockListResult swapBlocks(FSDirectory fsd, FSPermissionChecker pc,
+                          String src, String dst, long genTimestamp)
+      throws IOException {
+
+    final INodesInPath srcIIP = fsd.resolvePath(pc, src, DirOp.WRITE);
+    final INodesInPath dstIIP = fsd.resolvePath(pc, dst, DirOp.WRITE);
+    if (fsd.isPermissionEnabled()) {
+      fsd.checkAncestorAccess(pc, srcIIP, FsAction.WRITE);
+      fsd.checkAncestorAccess(pc, dstIIP, FsAction.WRITE);
+    }
+    if (NameNode.stateChangeLog.isDebugEnabled()) {
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.swapBlockList: "
+          + srcIIP.getPath() + " and " + dstIIP.getPath());
+    }
+    SwapBlockListResult result;
+    fsd.writeLock();
+    try {
+      result = swapBlockList(fsd, srcIIP, dstIIP, genTimestamp);
+    } finally {
+      fsd.writeUnlock();
+    }
+    return result;
+  }
+
+  private static SwapBlockListResult swapBlockList(FSDirectory fsd,
+                                    final INodesInPath srcIIP,
+                                    final INodesInPath dstIIP,
+                                    long genTimestamp)
+      throws IOException {
+
+    assert fsd.hasWriteLock();
+    validateInode(srcIIP);
+    validateInode(dstIIP);
+    fsd.ezManager.checkMoveValidity(srcIIP, dstIIP);
+
+    final String src = srcIIP.getPath();
+    final String dst = dstIIP.getPath();
+    if (dst.equals(src)) {
+      throw new FileAlreadyExistsException("The source " + src +
+          " and destination " + dst + " are the same");
+    }
+
+    INodeFile srcINodeFile = srcIIP.getLastINode().asFile();
+    INodeFile dstINodeFile = dstIIP.getLastINode().asFile();
+
+    String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
+    String error = "Swap Block List destination file ";
+    BlockInfo lastBlock = dstINodeFile.getLastBlock();
+    if (lastBlock != null && lastBlock.getGenerationStamp() != genTimestamp) {
+      error  += dstIIP.getPath() +
+          " has last block with different gen timestamp.";
+      NameNode.stateChangeLog.warn(errorPrefix + error);
+      throw new IOException(error);
+    }
+
+    long mtime = Time.now();
+    BlockInfo[] dstINodeFileBlocks = dstINodeFile.getBlocks();
+    dstINodeFile.replaceBlocks(srcINodeFile.getBlocks());
+    srcINodeFile.replaceBlocks(dstINodeFileBlocks);
+
+    long srcHeader = srcINodeFile.getHeaderLong();
+    long dstHeader = dstINodeFile.getHeaderLong();
+
+    byte dstBlockLayoutPolicy =
+        HeaderFormat.getBlockLayoutPolicy(dstHeader);
+    byte srcBlockLayoutPolicy =
+        HeaderFormat.getBlockLayoutPolicy(srcHeader);
+
+    byte dstStoragePolicyID = HeaderFormat.getStoragePolicyID(dstHeader);
+    byte srcStoragePolicyID = HeaderFormat.getStoragePolicyID(srcHeader);
+
+    dstINodeFile.updateHeaderWithNewPolicy(srcBlockLayoutPolicy,
+        srcStoragePolicyID);
+    dstINodeFile.setModificationTime(mtime);
+
+    srcINodeFile.updateHeaderWithNewPolicy(dstBlockLayoutPolicy,
+        dstStoragePolicyID);
+    srcINodeFile.setModificationTime(mtime);
+
+    return new SwapBlockListResult(true,
+        fsd.getAuditFileInfo(srcIIP),
+        fsd.getAuditFileInfo(dstIIP));
+  }
+
+  private static void validateInode(INodesInPath srcIIP)
+      throws IOException {
+
+    String errorPrefix = "DIR* FSDirectory.swapBlockList: ";
+    String error = "Swap Block List input ";
+
+    INode srcInode = FSDirectory.resolveLastINode(srcIIP);
+
+    // Check if INode is a file and NOT a directory.
+    if (!srcInode.isFile()) {
+      error  += srcIIP.getPath() + " is not a file.";
+      NameNode.stateChangeLog.warn(errorPrefix + error);
+      throw new IOException(error);
+    }
+
+    // Check if file is under construction.
+    INodeFile iNodeFile = (INodeFile) srcIIP.getLastINode();
+    if (iNodeFile.isUnderConstruction()) {
+      error  += srcIIP.getPath() + " is under construction.";
+      NameNode.stateChangeLog.warn(errorPrefix + error);
+      throw new IOException(error);
+    }
+
+    // Check if any parent directory is in a snapshot.
+    if (srcIIP.getLatestSnapshotId() != Snapshot.CURRENT_STATE_ID) {
+      error  += srcIIP.getPath() + " is in a snapshot directory.";
+      NameNode.stateChangeLog.warn(errorPrefix + error);
+      throw new IOException(error);
+    }
+  }
+
+  static class SwapBlockListResult {
+    private final boolean success;
+    private final FileStatus srcFileAuditStat;
+    private final FileStatus dstFileAuditStat;
+
+    SwapBlockListResult(boolean success,
+                        FileStatus srcFileAuditStat,
+                        FileStatus dstFileAuditStat) {
+      this.success = success;
+      this.srcFileAuditStat = srcFileAuditStat;
+      this.dstFileAuditStat = dstFileAuditStat;
+    }
+
+    public boolean isSuccess() {
+      return success;
+    }
+
+    public FileStatus getDstFileAuditStat() {
+      return dstFileAuditStat;
+    }
+
+    public FileStatus getSrcFileAuditStat() {
+      return srcFileAuditStat;
+    }
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java
new file mode 100644
index 0000000..d29fdee
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestSwapBlockList.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile.HeaderFormat;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.SnapshotTestHelper;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test SwapBlockListOp working.
+ */
+public class TestSwapBlockList {
+
+  private static final short REPLICATION = 3;
+
+  private static final long SEED = 0;
+  private final Path rootDir = new Path("/" + getClass().getSimpleName());
+
+  private final Path subDir1 = new Path(rootDir, "dir1");
+  private final Path file1 = new Path(subDir1, "file1");
+  private final Path file2 = new Path(subDir1, "file2");
+
+  private final Path subDir11 = new Path(subDir1, "dir11");
+  private final Path file3 = new Path(subDir11, "file3");
+
+  private final Path subDir2 = new Path(rootDir, "dir2");
+  private final Path file4 = new Path(subDir2, "file4");
+
+  private Configuration conf;
+  private MiniDFSCluster cluster;
+  private FSNamesystem fsn;
+  private FSDirectory fsdir;
+
+  private DistributedFileSystem hdfs;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAX_XATTRS_PER_INODE_KEY, 2);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(REPLICATION)
+        .build();
+    cluster.waitActive();
+
+    fsn = cluster.getNamesystem();
+    fsdir = fsn.getFSDirectory();
+
+    hdfs = cluster.getFileSystem();
+
+    hdfs.mkdirs(subDir2);
+
+    DFSTestUtil.createFile(hdfs, file1, 1024, REPLICATION, SEED);
+    DFSTestUtil.createFile(hdfs, file2, 1024, REPLICATION, SEED);
+    DFSTestUtil.createFile(hdfs, file3, 1024, REPLICATION, SEED);
+    DFSTestUtil.createFile(hdfs, file4, 1024, REPLICATION, SEED);
+
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testInputValidation() throws Exception {
+
+    // Source file not found.
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        "/TestSwapBlockList/dir1/fileXYZ", () -> fsn.swapBlockList(
+            "/TestSwapBlockList/dir1/fileXYZ", "/TestSwapBlockList/dir1/dir11" +
+                "/file3", 0L));
+
+    // Destination file not found.
+    LambdaTestUtils.intercept(FileNotFoundException.class,
+        "/TestSwapBlockList/dir1/dir11/fileXYZ",
+        () -> fsn.swapBlockList("/TestSwapBlockList/dir1/file1",
+            "/TestSwapBlockList/dir1/dir11/fileXYZ", 0L));
+
+    // Source is Directory, not a file.
+    LambdaTestUtils.intercept(IOException.class,
+        "/TestSwapBlockList/dir1 is not a file.",
+        () -> fsn.swapBlockList("/TestSwapBlockList/dir1",
+            "/TestSwapBlockList/dir1/dir11/file3", 0L));
+
+    String sourceFile = "/TestSwapBlockList/dir1/file1";
+    String dstFile1 = "/TestSwapBlockList/dir1/dir11/file3";
+
+    // Destination file is under construction.
+    INodeFile dstInodeFile = fsdir.resolvePath(fsdir.getPermissionChecker(),
+        dstFile1, FSDirectory.DirOp.WRITE).getLastINode().asFile();
+    dstInodeFile.toUnderConstruction("TestClient", "TestClientMachine");
+    LambdaTestUtils.intercept(IOException.class,
+        dstFile1 + " is under construction.",
+        () -> fsn.swapBlockList(sourceFile, dstFile1, 0L));
+
+    // Check if parent directory is in snapshot.
+    SnapshotTestHelper.createSnapshot(hdfs, subDir2, "s0");
+    String dstFile2 = "/TestSwapBlockList/dir2/file4";
+    LambdaTestUtils.intercept(IOException.class,
+        dstFile2 + " is in a snapshot directory",
+        () -> fsn.swapBlockList(sourceFile, dstFile2, 0L));
+
+    // Check if gen timestamp validation works.
+    String dstFile3 = "/TestSwapBlockList/dir1/file2";
+    dstInodeFile = (INodeFile) fsdir.resolvePath(fsdir.getPermissionChecker(),
+        dstFile3, FSDirectory.DirOp.WRITE).getLastINode();
+    long genStamp = dstInodeFile.getLastBlock().getGenerationStamp();
+    dstInodeFile.getLastBlock().setGenerationStamp(genStamp + 1);
+    LambdaTestUtils.intercept(IOException.class,
+        dstFile3 + " has last block with different gen timestamp.",
+        () -> fsn.swapBlockList(sourceFile, dstFile3, genStamp));
+  }
+
+  @Test
+  public void testSwapBlockListOp() throws Exception {
+    String sourceFile = "/TestSwapBlockList/dir1/file1";
+    String dstFile = "/TestSwapBlockList/dir1/dir11/file3";
+
+    INodeFile srcInodeFile =
+        (INodeFile) fsdir.resolvePath(fsdir.getPermissionChecker(),
+            sourceFile, FSDirectory.DirOp.WRITE).getLastINode();
+    INodeFile dstInodeFile =
+        (INodeFile) fsdir.resolvePath(fsdir.getPermissionChecker(),
+            dstFile, FSDirectory.DirOp.WRITE).getLastINode();
+
+    BlockInfo[] srcBlockLocationsBeforeSwap = srcInodeFile.getBlocks();
+    long srcHeader = srcInodeFile.getHeaderLong();
+
+    BlockInfo[] dstBlockLocationsBeforeSwap = dstInodeFile.getBlocks();
+    long dstHeader = dstInodeFile.getHeaderLong();
+
+    fsn.swapBlockList(sourceFile, dstFile,
+        dstInodeFile.getLastBlock().getGenerationStamp());
+    assertBlockListEquality(dstBlockLocationsBeforeSwap,
+        srcInodeFile.getBlocks(), srcInodeFile.getId());
+    assertBlockListEquality(srcBlockLocationsBeforeSwap,
+        dstInodeFile.getBlocks(), dstInodeFile.getId());
+
+    // Assert Block Layout
+    assertEquals(HeaderFormat.getBlockLayoutPolicy(srcHeader),
+        HeaderFormat.getBlockLayoutPolicy(dstInodeFile.getHeaderLong()));
+    assertEquals(HeaderFormat.getBlockLayoutPolicy(dstHeader),
+        HeaderFormat.getBlockLayoutPolicy(srcInodeFile.getHeaderLong()));
+
+    // Assert Storage policy
+    assertEquals(HeaderFormat.getStoragePolicyID(srcHeader),
+        HeaderFormat.getStoragePolicyID(dstInodeFile.getHeaderLong()));
+    assertEquals(HeaderFormat.getStoragePolicyID(dstHeader),
+        HeaderFormat.getStoragePolicyID(srcInodeFile.getHeaderLong()));
+  }
+
+  @Test
+  public void testSwapBlockListOpRollback() throws Exception {
+    // Invoke swap twice and make sure the blocks are back to their original
+    // file.
+    String sourceFile = "/TestSwapBlockList/dir1/file1";
+    String dstFile = "/TestSwapBlockList/dir1/dir11/file3";
+
+    INodeFile srcInodeFile =
+        (INodeFile) fsdir.resolvePath(fsdir.getPermissionChecker(),
+            sourceFile, FSDirectory.DirOp.WRITE).getLastINode();
+    INodeFile dstInodeFile =
+        (INodeFile) fsdir.resolvePath(fsdir.getPermissionChecker(),
+            dstFile, FSDirectory.DirOp.WRITE).getLastINode();
+
+    BlockInfo[] srcBlockLocationsBeforeSwap = srcInodeFile.getBlocks();
+    long srcHeader = srcInodeFile.getHeaderLong();
+
+    BlockInfo[] dstBlockLocationsBeforeSwap = dstInodeFile.getBlocks();
+    long dstHeader = dstInodeFile.getHeaderLong();
+
+    testSwapBlockListOp();
+    testSwapBlockListOp();
+
+    assertBlockListEquality(dstBlockLocationsBeforeSwap,
+        dstInodeFile.getBlocks(), dstInodeFile.getId());
+    assertBlockListEquality(srcBlockLocationsBeforeSwap,
+        srcInodeFile.getBlocks(), srcInodeFile.getId());
+
+    // Assert Block Layout
+    assertEquals(HeaderFormat.getBlockLayoutPolicy(srcHeader),
+        HeaderFormat.getBlockLayoutPolicy(srcInodeFile.getHeaderLong()));
+    assertEquals(HeaderFormat.getBlockLayoutPolicy(dstHeader),
+        HeaderFormat.getBlockLayoutPolicy(dstInodeFile.getHeaderLong()));
+  }
+
+  private void assertBlockListEquality(BlockInfo[] expected,
+                                       BlockInfo[] actual,
+                                       long expectedId) {
+    assertEquals(expected.length, actual.length);
+    for (int i = 0; i < expected.length; i++) {
+      assertEquals(expected[i].getBlockId(), actual[i].getBlockId());
+      assertEquals(expectedId, actual[i].getBlockCollectionId());
+    }
+  }
+}