HDDS-5777. Provide an option to dump table scan data to file. (#2675)

diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmLDBCli.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmLDBCli.java
index ea39a1f..4027455 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmLDBCli.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmLDBCli.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.ozone.om;
 
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -33,7 +34,11 @@
 import org.junit.Assert;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.time.LocalDateTime;
 import java.util.List;
 import java.util.ArrayList;
 
@@ -98,15 +103,72 @@
     Assert.assertTrue(getKeyNames(dbScanner).contains("key1"));
     Assert.assertTrue(getKeyNames(dbScanner).contains("key5"));
     Assert.assertFalse(getKeyNames(dbScanner).contains("key6"));
+
     DBScanner.setLimit(1);
     Assert.assertEquals(1, getKeyNames(dbScanner).size());
-    DBScanner.setLimit(-1);
+
+    DBScanner.setLimit(0);
     try {
       getKeyNames(dbScanner);
       Assert.fail("IllegalArgumentException is expected");
     }catch (IllegalArgumentException e){
       //ignore
     }
+
+    // If set with -1, check if it dumps entire table data.
+    DBScanner.setLimit(-1);
+    Assert.assertEquals(5, getKeyNames(dbScanner).size());
+
+    // Test dump to file.
+    File tempFile = folder.newFolder();
+    String outFile = tempFile.getAbsolutePath() + "keyTable"
+        + LocalDateTime.now();
+    BufferedReader bufferedReader = null;
+    try {
+      DBScanner.setLimit(-1);
+      DBScanner.setFileName(outFile);
+      keyNames = getKeyNames(dbScanner);
+      Assert.assertEquals(5, keyNames.size());
+      Assert.assertTrue(new File(outFile).exists());
+
+      bufferedReader = new BufferedReader(
+          new InputStreamReader(new FileInputStream(outFile), UTF_8));
+
+      String readLine;
+      int count = 0;
+
+      while ((readLine = bufferedReader.readLine()) != null) {
+        for (String keyName : keyNames) {
+          if (readLine.contains(keyName)) {
+            count++;
+            break;
+          }
+        }
+      }
+
+      // As keyName will be in the file twice for each key.
+      // Once in keyName and second time in fileName.
+
+      // Sample key data.
+      // {
+      // ..
+      // ..
+      // "keyName": "key5",
+      // "fileName": "key5",
+      // ..
+      // ..
+      // }
+
+      Assert.assertEquals("File does not have all keys",
+          keyNames.size() * 2, count);
+    } finally {
+      if (bufferedReader != null) {
+        bufferedReader.close();
+      }
+      if (new File(outFile).exists()) {
+        FileUtils.deleteQuietly(new File(outFile));
+      }
+    }
   }
 
   private List<String> getKeyNames(DBScanner scanner)
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java
index b8a9002..028d483 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/debug/DBScanner.java
@@ -18,7 +18,11 @@
 
 package org.apache.hadoop.ozone.debug;
 
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.Writer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -63,9 +67,14 @@
   private static boolean withKey;
 
   @CommandLine.Option(names = {"--length", "-l"},
-          description = "Maximum number of items to list")
+          description = "Maximum number of items to list. " +
+              "If -1 dumps the entire table data")
   private static int limit = 100;
 
+  @CommandLine.Option(names = {"--out", "-o"},
+      description = "File to dump table scan data")
+  private static String fileName;
+
   @CommandLine.ParentCommand
   private RDBParser parent;
 
@@ -77,23 +86,47 @@
       DBColumnFamilyDefinition dbColumnFamilyDefinition) throws IOException {
     List<Object> outputs = new ArrayList<>();
     iterator.seekToFirst();
-    while (iterator.isValid() && limit > 0){
-      StringBuilder result = new StringBuilder();
-      if (withKey) {
-        Object key = dbColumnFamilyDefinition.getKeyCodec()
-            .fromPersistedFormat(iterator.key());
-        Gson gson = new GsonBuilder().setPrettyPrinting().create();
-        result.append(gson.toJson(key));
-        result.append(" -> ");
+
+    Writer fileWriter = null;
+    PrintWriter printWriter = null;
+    try {
+      if (fileName != null) {
+        fileWriter = new OutputStreamWriter(
+            new FileOutputStream(fileName), StandardCharsets.UTF_8);
+        printWriter = new PrintWriter(fileWriter);
       }
-      Object o = dbColumnFamilyDefinition.getValueCodec()
-              .fromPersistedFormat(iterator.value());
-      outputs.add(o);
-      Gson gson = new GsonBuilder().setPrettyPrinting().create();
-      result.append(gson.toJson(o));
-      System.out.println(result.toString());
-      limit--;
-      iterator.next();
+      while (iterator.isValid()) {
+        StringBuilder result = new StringBuilder();
+        if (withKey) {
+          Object key = dbColumnFamilyDefinition.getKeyCodec()
+              .fromPersistedFormat(iterator.key());
+          Gson gson = new GsonBuilder().setPrettyPrinting().create();
+          result.append(gson.toJson(key));
+          result.append(" -> ");
+        }
+        Object o = dbColumnFamilyDefinition.getValueCodec()
+            .fromPersistedFormat(iterator.value());
+        outputs.add(o);
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        result.append(gson.toJson(o));
+        if (fileName != null) {
+          printWriter.println(result);
+        } else {
+          System.out.println(result.toString());
+        }
+        limit--;
+        iterator.next();
+        if (limit == 0) {
+          break;
+        }
+      }
+    } finally {
+      if (printWriter != null) {
+        printWriter.close();
+      }
+      if (fileWriter != null) {
+        fileWriter.close();
+      }
     }
     return outputs;
   }
@@ -118,6 +151,10 @@
     return scannedObjects;
   }
 
+  public static void setFileName(String name) {
+    DBScanner.fileName = name;
+  }
+
   private static ColumnFamilyHandle getColumnFamilyHandle(
             byte[] name, List<ColumnFamilyHandle> columnFamilyHandles) {
     return columnFamilyHandles
@@ -166,9 +203,10 @@
   private void printAppropriateTable(
           List<ColumnFamilyHandle> columnFamilyHandleList,
           RocksDB rocksDB, String dbPath) throws IOException {
-    if (limit < 1) {
+    if (limit < 1 && limit != -1) {
       throw new IllegalArgumentException(
-              "List length should be a positive number");
+              "List length should be a positive number. Only allowed negative" +
+                  " number is -1 which is to dump entire table");
     }
     dbPath = removeTrailingSlashIfNeeded(dbPath);
     this.constructColumnFamilyMap(DBDefinitionFactory.