Fix for NUTCH-1863: Add JSON format dump output to readdb command (#490)


diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index f59f895..b9200e7 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -27,6 +27,7 @@
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -75,6 +76,12 @@
 import org.apache.nutch.util.TimingUtil;
 import org.apache.commons.jexl2.Expression;
 
+import com.fasterxml.jackson.core.JsonGenerationException;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+
 /**
  * Read utility for the CrawlDB.
  * 
@@ -96,7 +103,8 @@
       throws IOException {
     Path crawlDbPath = new Path(crawlDb, CrawlDb.CURRENT_NAME);
 
-    FileStatus stat = crawlDbPath.getFileSystem(config).getFileStatus(crawlDbPath);
+    FileStatus stat = crawlDbPath.getFileSystem(config)
+        .getFileStatus(crawlDbPath);
     long lastModified = stat.getModificationTime();
 
     synchronized (this) {
@@ -128,16 +136,33 @@
     readers = null;
   }
 
-  public static class CrawlDatumCsvOutputFormat extends
-      FileOutputFormat<Text, CrawlDatum> {
-    protected static class LineRecordWriter extends
-        RecordWriter<Text, CrawlDatum> {
+  @SuppressWarnings("serial")
+  public static class JsonIndenter extends MinimalPrettyPrinter {
+
+    @Override
+    public void writeObjectFieldValueSeparator(JsonGenerator jg)
+        throws IOException, JsonGenerationException {
+      jg.writeRaw(": ");
+    }
+
+    @Override
+    public void writeObjectEntrySeparator(JsonGenerator jg)
+        throws IOException, JsonGenerationException {
+      jg.writeRaw(", ");
+    }
+  }
+
+  public static class CrawlDatumCsvOutputFormat
+      extends FileOutputFormat<Text, CrawlDatum> {
+    protected static class LineRecordWriter
+        extends RecordWriter<Text, CrawlDatum> {
       private DataOutputStream out;
 
       public LineRecordWriter(DataOutputStream out) {
         this.out = out;
         try {
-          out.writeBytes("Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n");
+          out.writeBytes(
+              "Url,Status code,Status name,Fetch Time,Modified Time,Retries since fetch,Retry interval seconds,Retry interval days,Score,Signature,Metadata\n");
         } catch (IOException e) {
         }
       }
@@ -162,13 +187,15 @@
         out.writeByte(',');
         out.writeBytes(Float.toString(value.getFetchInterval()));
         out.writeByte(',');
-        out.writeBytes(Float.toString((value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY)));
+        out.writeBytes(Float.toString(
+            (value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY)));
         out.writeByte(',');
         out.writeBytes(Float.toString(value.getScore()));
         out.writeByte(',');
         out.writeByte('"');
-        out.writeBytes(value.getSignature() != null ? StringUtil
-            .toHexString(value.getSignature()) : "null");
+        out.writeBytes(value.getSignature() != null
+            ? StringUtil.toHexString(value.getSignature())
+            : "null");
         out.writeByte('"');
         out.writeByte(',');
         out.writeByte('"');
@@ -185,13 +212,14 @@
         out.writeByte('\n');
       }
 
-      public synchronized void close(TaskAttemptContext context) throws IOException {
+      public synchronized void close(TaskAttemptContext context)
+          throws IOException {
         out.close();
       }
     }
 
-    public RecordWriter<Text, CrawlDatum> getRecordWriter(TaskAttemptContext
-        context) throws IOException {
+    public RecordWriter<Text, CrawlDatum> getRecordWriter(
+        TaskAttemptContext context) throws IOException {
       String name = getUniqueFile(context, "part", "");
       Path dir = FileOutputFormat.getOutputPath(context);
       FileSystem fs = dir.getFileSystem(context.getConfiguration());
@@ -200,13 +228,77 @@
     }
   }
 
-  public static class CrawlDbStatMapper extends
-      Mapper<Text, CrawlDatum, Text, NutchWritable> {
+  public static class CrawlDatumJsonOutputFormat
+      extends FileOutputFormat<Text, CrawlDatum> {
+    protected static class LineRecordWriter
+        extends RecordWriter<Text, CrawlDatum> {
+      private DataOutputStream out;
+      private ObjectMapper jsonMapper = new ObjectMapper();
+      private ObjectWriter jsonWriter;
+
+      public LineRecordWriter(DataOutputStream out) {
+        this.out = out;
+        jsonMapper.getFactory()
+            .configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);
+        jsonWriter = jsonMapper.writer(new JsonIndenter());
+      }
+
+      public synchronized void write(Text key, CrawlDatum value)
+          throws IOException {
+        Map<String, Object> data = new LinkedHashMap<String, Object>();
+        data.put("url", key.toString());
+        data.put("statusCode", value.getStatus());
+        data.put("statusName", CrawlDatum.getStatusName(value.getStatus()));
+        data.put("fetchTime", new Date(value.getFetchTime()).toString());
+        data.put("modifiedTime", new Date(value.getModifiedTime()).toString());
+        data.put("retriesSinceFetch", value.getRetriesSinceFetch());
+        data.put("retryIntervalSeconds", value.getFetchInterval());
+        data.put("retryIntervalDays", (value.getFetchInterval() / FetchSchedule.SECONDS_PER_DAY));
+        data.put("score", value.getScore());
+        data.put("signature",
+            (value.getSignature() != null
+                ? StringUtil.toHexString(value.getSignature())
+                : "null"));
+        Map<String, Object> metaData = null;
+        if (value.getMetaData() != null) {
+          metaData = new LinkedHashMap<String, Object>();
+          for (Entry<Writable, Writable> e : value.getMetaData().entrySet()) {
+            metaData.put(e.getKey().toString(), e.getValue());
+          }
+        }
+        if (metaData != null) {
+          data.put("metadata", metaData);
+        } else {
+          data.put("metadata", "");
+        }
+        out.write(jsonWriter.writeValueAsBytes(data));
+        out.writeByte('\n');
+      }
+
+      public synchronized void close(TaskAttemptContext context)
+          throws IOException {
+        out.close();
+      }
+    }
+
+    public RecordWriter<Text, CrawlDatum> getRecordWriter(
+        TaskAttemptContext context) throws IOException {
+      String name = getUniqueFile(context, "part", "");
+      Path dir = FileOutputFormat.getOutputPath(context);
+      FileSystem fs = dir.getFileSystem(context.getConfiguration());
+      DataOutputStream fileOut = fs.create(new Path(dir, name), context);
+      return new LineRecordWriter(fileOut);
+    }
+  }
+
+  public static class CrawlDbStatMapper
+      extends Mapper<Text, CrawlDatum, Text, NutchWritable> {
     NutchWritable COUNT_1 = new NutchWritable(new LongWritable(1));
     private boolean sort = false;
 
     @Override
-    public void setup(Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
+    public void setup(
+        Mapper<Text, CrawlDatum, Text, NutchWritable>.Context context) {
       Configuration conf = context.getConfiguration();
       sort = conf.getBoolean("db.reader.stats.sort", false);
     }
@@ -216,8 +308,7 @@
         throws IOException, InterruptedException {
       context.write(new Text("T"), COUNT_1);
       context.write(new Text("status " + value.getStatus()), COUNT_1);
-      context.write(new Text("retry " + value.getRetriesSinceFetch()), 
-          COUNT_1);
+      context.write(new Text("retry " + value.getRetriesSinceFetch()), COUNT_1);
 
       if (Float.isNaN(value.getScore())) {
         context.write(new Text("scNaN"), COUNT_1);
@@ -236,7 +327,8 @@
       context.write(new Text("ftt"), fetchTime);
 
       // fetch interval (in seconds)
-      NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval()));
+      NutchWritable fetchInterval = new NutchWritable(
+          new LongWritable(value.getFetchInterval()));
       context.write(new Text("fi"), fetchInterval);
       context.write(new Text("fit"), fetchInterval);
 
@@ -249,15 +341,15 @@
     }
   }
 
-  public static class CrawlDbStatReducer extends
-      Reducer<Text, NutchWritable, Text, NutchWritable> {
-    public void setup(Reducer<Text, NutchWritable, Text, NutchWritable>.Context context) {
+  public static class CrawlDbStatReducer
+      extends Reducer<Text, NutchWritable, Text, NutchWritable> {
+    public void setup(
+        Reducer<Text, NutchWritable, Text, NutchWritable>.Context context) {
     }
 
     @Override
     public void reduce(Text key, Iterable<NutchWritable> values,
-        Context context)
-        throws IOException, InterruptedException {
+        Context context) throws IOException, InterruptedException {
       String k = key.toString();
       if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
           || k.equals("ftt") || k.equals("fit")) {
@@ -334,20 +426,20 @@
     }
   }
 
-  public static class CrawlDbTopNMapper extends
-      Mapper<Text, CrawlDatum, FloatWritable, Text> {
+  public static class CrawlDbTopNMapper
+      extends Mapper<Text, CrawlDatum, FloatWritable, Text> {
     private static final FloatWritable fw = new FloatWritable();
     private float min = 0.0f;
 
     @Override
-    public void setup(Mapper<Text, CrawlDatum, FloatWritable, Text>.Context context) {
+    public void setup(
+        Mapper<Text, CrawlDatum, FloatWritable, Text>.Context context) {
       Configuration conf = context.getConfiguration();
       min = conf.getFloat("db.reader.topn.min", 0.0f);
     }
 
     @Override
-    public void map(Text key, CrawlDatum value,
-        Context context)
+    public void map(Text key, CrawlDatum value, Context context)
         throws IOException, InterruptedException {
       if (value.getScore() < min)
         return; // don't collect low-scoring records
@@ -356,15 +448,14 @@
     }
   }
 
-  public static class CrawlDbTopNReducer extends
-      Reducer<FloatWritable, Text, FloatWritable, Text> {
+  public static class CrawlDbTopNReducer
+      extends Reducer<FloatWritable, Text, FloatWritable, Text> {
     private long topN;
     private long count = 0L;
 
     @Override
     public void reduce(FloatWritable key, Iterable<Text> values,
-        Context context)
-        throws IOException, InterruptedException {
+        Context context) throws IOException, InterruptedException {
       for (Text value : values) {
         if (count < topN) {
           key.set(-key.get());
@@ -375,9 +466,11 @@
     }
 
     @Override
-    public void setup(Reducer<FloatWritable, Text, FloatWritable, Text>.Context context) {
+    public void setup(
+        Reducer<FloatWritable, Text, FloatWritable, Text>.Context context) {
       Configuration conf = context.getConfiguration();
-      topN = conf.getLong("db.reader.topn", 100) / Integer.parseInt(conf.get("mapreduce.job.reduces"));
+      topN = conf.getLong("db.reader.topn", 100)
+          / Integer.parseInt(conf.get("mapreduce.job.reduces"));
     }
   }
 
@@ -385,30 +478,32 @@
     closeReaders();
   }
 
-  private TreeMap<String, Writable> processStatJobHelper(String crawlDb, Configuration config, boolean sort) 
-          throws IOException, InterruptedException, ClassNotFoundException{
-	  Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis());
+  private TreeMap<String, Writable> processStatJobHelper(String crawlDb,
+      Configuration config, boolean sort)
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Path tmpFolder = new Path(crawlDb, "stat_tmp" + System.currentTimeMillis());
 
-	  Job job = NutchJob.getInstance(config);
-	  config = job.getConfiguration();
-	  job.setJobName("stats " + crawlDb);
-	  config.setBoolean("db.reader.stats.sort", sort);
+    Job job = NutchJob.getInstance(config);
+    config = job.getConfiguration();
+    job.setJobName("stats " + crawlDb);
+    config.setBoolean("db.reader.stats.sort", sort);
 
-	  FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-	  job.setInputFormatClass(SequenceFileInputFormat.class);
+    FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
+    job.setInputFormatClass(SequenceFileInputFormat.class);
 
-	  job.setJarByClass(CrawlDbReader.class);
-	  job.setMapperClass(CrawlDbStatMapper.class);
-	  job.setCombinerClass(CrawlDbStatReducer.class);
-	  job.setReducerClass(CrawlDbStatReducer.class);
+    job.setJarByClass(CrawlDbReader.class);
+    job.setMapperClass(CrawlDbStatMapper.class);
+    job.setCombinerClass(CrawlDbStatReducer.class);
+    job.setReducerClass(CrawlDbStatReducer.class);
 
-	  FileOutputFormat.setOutputPath(job, tmpFolder);
-	  job.setOutputFormatClass(SequenceFileOutputFormat.class);
-	  job.setOutputKeyClass(Text.class);
-	  job.setOutputValueClass(NutchWritable.class);
+    FileOutputFormat.setOutputPath(job, tmpFolder);
+    job.setOutputFormatClass(SequenceFileOutputFormat.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(NutchWritable.class);
 
-	  // https://issues.apache.org/jira/browse/NUTCH-1029
-	  config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false);
+    // https://issues.apache.org/jira/browse/NUTCH-1029
+    config.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs",
+        false);
     FileSystem fileSystem = tmpFolder.getFileSystem(config);
     try {
       boolean success = job.waitForCompletion(true);
@@ -427,38 +522,39 @@
     }
 
     // reading the result
-    SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder, config);
+    SequenceFile.Reader[] readers = SegmentReaderUtil.getReaders(tmpFolder,
+        config);
 
-	  Text key = new Text();
-	  NutchWritable value = new NutchWritable();
+    Text key = new Text();
+    NutchWritable value = new NutchWritable();
 
-	  TreeMap<String, Writable> stats = new TreeMap<>();
-	  for (int i = 0; i < readers.length; i++) {
-		  SequenceFile.Reader reader = readers[i];
-		  while (reader.next(key, value)) {
-			  String k = key.toString();
-			  Writable val = stats.get(k);
-			  if (val == null) {
-			    stats.put(k, value.get());
-			    continue;
-			  }
-			  if (k.equals("sc")) {
-			    float min = Float.MAX_VALUE;
+    TreeMap<String, Writable> stats = new TreeMap<>();
+    for (int i = 0; i < readers.length; i++) {
+      SequenceFile.Reader reader = readers[i];
+      while (reader.next(key, value)) {
+        String k = key.toString();
+        Writable val = stats.get(k);
+        if (val == null) {
+          stats.put(k, value.get());
+          continue;
+        }
+        if (k.equals("sc")) {
+          float min = Float.MAX_VALUE;
           float max = Float.MIN_VALUE;
-			    if (stats.containsKey("scn")) {
-			      min = ((FloatWritable) stats.get("scn")).get();
-			    } else {
-			      min = ((FloatWritable) stats.get("sc")).get();
-			    }
+          if (stats.containsKey("scn")) {
+            min = ((FloatWritable) stats.get("scn")).get();
+          } else {
+            min = ((FloatWritable) stats.get("sc")).get();
+          }
           if (stats.containsKey("scx")) {
             max = ((FloatWritable) stats.get("scx")).get();
           } else {
             max = ((FloatWritable) stats.get("sc")).get();
           }
-			    float fvalue = ((FloatWritable) value.get()).get();
-			    if (min > fvalue) {
-			      min = fvalue;
-			    }
+          float fvalue = ((FloatWritable) value.get()).get();
+          if (min > fvalue) {
+            min = fvalue;
+          }
           if (max < fvalue) {
             max = fvalue;
           }
@@ -488,17 +584,16 @@
           }
           stats.put(k + "n", new LongWritable(min));
           stats.put(k + "x", new LongWritable(max));
-			  } else if (k.equals("sct")) {
+        } else if (k.equals("sct")) {
           FloatWritable fvalue = (FloatWritable) value.get();
-          ((FloatWritable) val)
-              .set(((FloatWritable) val).get() + fvalue.get());
+          ((FloatWritable) val).set(((FloatWritable) val).get() + fvalue.get());
         } else if (k.equals("scd")) {
           MergingDigest tdigest = null;
           MergingDigest tdig = MergingDigest.fromBytes(
               ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
           if (val instanceof BytesWritable) {
-            tdigest = MergingDigest.fromBytes(
-                ByteBuffer.wrap(((BytesWritable) val).getBytes()));
+            tdigest = MergingDigest
+                .fromBytes(ByteBuffer.wrap(((BytesWritable) val).getBytes()));
             tdigest.add(tdig);
           } else {
             tdigest = tdig;
@@ -509,22 +604,21 @@
           stats.put(k, new BytesWritable(tdigestBytes.array()));
         } else {
           LongWritable lvalue = (LongWritable) value.get();
-          ((LongWritable) val)
-              .set(((LongWritable) val).get() + lvalue.get());
-			  }
-		  }
-		  reader.close();
-	  }
+          ((LongWritable) val).set(((LongWritable) val).get() + lvalue.get());
+        }
+      }
+      reader.close();
+    }
     // remove score, fetch interval, and fetch time
     // (used for min/max calculation)
     stats.remove("sc");
     stats.remove("fi");
     stats.remove("ft");
-	  // removing the tmp folder
-	  fileSystem.delete(tmpFolder, true);
-	  return stats;
+    // removing the tmp folder
+    fileSystem.delete(tmpFolder, true);
+    return stats;
   }
-  
+
   public void processStatJob(String crawlDb, Configuration config, boolean sort)
       throws IOException, InterruptedException, ClassNotFoundException {
 
@@ -559,7 +653,8 @@
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb statistics start: " + crawlDb);
     }
-    TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, config, sort);
+    TreeMap<String, Writable> stats = processStatJobHelper(crawlDb, config,
+        sort);
 
     if (LOG.isInfoEnabled()) {
       LOG.info("Statistics for CrawlDb: " + crawlDb);
@@ -649,8 +744,8 @@
     return 0;
   }
 
-  public void readUrl(String crawlDb, String url, Configuration config, StringBuilder output)
-      throws IOException {
+  public void readUrl(String crawlDb, String url, Configuration config,
+      StringBuilder output) throws IOException {
     CrawlDatum res = get(crawlDb, url, config);
     output.append("URL: " + url + "\n");
     if (res != null) {
@@ -663,7 +758,8 @@
 
   public void processDumpJob(String crawlDb, String output,
       Configuration config, String format, String regex, String status,
-      Integer retry, String expr, Float sample) throws IOException, ClassNotFoundException, InterruptedException {
+      Integer retry, String expr, Float sample)
+      throws IOException, ClassNotFoundException, InterruptedException {
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb dump: starting");
       LOG.info("CrawlDb db: " + crawlDb);
@@ -683,6 +779,8 @@
       job.setOutputFormatClass(CrawlDatumCsvOutputFormat.class);
     } else if (format.equals("crawldb")) {
       job.setOutputFormatClass(MapFileOutputFormat.class);
+    } else if (format.equals("json")) {
+      job.setOutputFormatClass(CrawlDatumJsonOutputFormat.class);
     } else {
       job.setOutputFormatClass(TextOutputFormat.class);
     }
@@ -724,8 +822,8 @@
     }
   }
 
-  public static class CrawlDbDumpMapper extends
-      Mapper<Text, CrawlDatum, Text, CrawlDatum> {
+  public static class CrawlDbDumpMapper
+      extends Mapper<Text, CrawlDatum, Text, CrawlDatum> {
     Pattern pattern = null;
     Matcher matcher = null;
     String status = null;
@@ -734,14 +832,15 @@
     float sample;
 
     @Override
-    public void setup(Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
+    public void setup(
+        Mapper<Text, CrawlDatum, Text, CrawlDatum>.Context context) {
       Configuration config = context.getConfiguration();
       if (config.get("regex", null) != null) {
         pattern = Pattern.compile(config.get("regex"));
       }
       status = config.get("status", null);
       retry = config.getInt("retry", -1);
-      
+
       if (config.get("expr", null) != null) {
         expr = JexlUtil.parseExpression(config.get("expr", null));
       }
@@ -749,8 +848,7 @@
     }
 
     @Override
-    public void map(Text key, CrawlDatum value,
-        Context context)
+    public void map(Text key, CrawlDatum value, Context context)
         throws IOException, InterruptedException {
 
       // check sample
@@ -765,9 +863,8 @@
       }
 
       // check status
-      if (status != null
-          && !status.equalsIgnoreCase(CrawlDatum.getStatusName(value
-              .getStatus())))
+      if (status != null && !status
+          .equalsIgnoreCase(CrawlDatum.getStatusName(value.getStatus())))
         return;
 
       // check regex
@@ -777,7 +874,7 @@
           return;
         }
       }
-      
+
       // check expr
       if (expr != null) {
         if (!value.evaluate(expr, key.toString())) {
@@ -790,8 +887,8 @@
   }
 
   public void processTopNJob(String crawlDb, long topN, float min,
-      String output, Configuration config) throws IOException, 
-      ClassNotFoundException, InterruptedException {
+      String output, Configuration config)
+      throws IOException, ClassNotFoundException, InterruptedException {
 
     if (LOG.isInfoEnabled()) {
       LOG.info("CrawlDb topN: starting (topN=" + topN + ", min=" + min + ")");
@@ -799,9 +896,9 @@
     }
 
     Path outFolder = new Path(output);
-    Path tempDir = new Path(config.get("mapreduce.cluster.temp.dir", ".")
-        + "/readdb-topN-temp-"
-        + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
+    Path tempDir = new Path(
+        config.get("mapreduce.cluster.temp.dir", ".") + "/readdb-topN-temp-"
+            + Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));
 
     Job job = NutchJob.getInstance(config);
     job.setJobName("topN prepare " + crawlDb);
@@ -818,9 +915,9 @@
     job.setOutputValueClass(Text.class);
 
     job.getConfiguration().setFloat("db.reader.topn.min", min);
-   
-    FileSystem fs = tempDir.getFileSystem(config); 
-    try{
+
+    FileSystem fs = tempDir.getFileSystem(config);
+    try {
       boolean success = job.waitForCompletion(true);
       if (!success) {
         String message = "CrawlDbReader job did not succeed, job status:"
@@ -856,7 +953,7 @@
 
     job.setNumReduceTasks(1); // create a single file.
 
-    try{
+    try {
       boolean success = job.waitForCompletion(true);
       if (!success) {
         String message = "CrawlDbReader job did not succeed, job status:"
@@ -879,35 +976,38 @@
 
   }
 
-
-  public int run(String[] args) throws IOException, InterruptedException, ClassNotFoundException, Exception {
+  public int run(String[] args) throws IOException, InterruptedException,
+      ClassNotFoundException, Exception {
     @SuppressWarnings("resource")
     CrawlDbReader dbr = new CrawlDbReader();
 
     if (args.length < 2) {
-      System.err
-          .println("Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
+      System.err.println(
+          "Usage: CrawlDbReader <crawldb> (-stats | -dump <out_dir> | -topN <nnnn> <out_dir> [<min>] | -url <url>)");
       System.err
           .println("\t<crawldb>\tdirectory name where crawldb is located");
       System.err
           .println("\t-stats [-sort] \tprint overall statistics to System.out");
       System.err.println("\t\t[-sort]\tlist status sorted by host");
-      System.err
-          .println("\t-dump <out_dir> [-format normal|csv|crawldb]\tdump the whole db to a text file in <out_dir>");
+      System.err.println(
+          "\t-dump <out_dir> [-format normal|csv|crawldb|json]\tdump the whole db to a text file in <out_dir>");
       System.err.println("\t\t[-format csv]\tdump in Csv format");
-      System.err
-          .println("\t\t[-format normal]\tdump in standard format (default option)");
+      System.err.println(
+          "\t\t[-format normal]\tdump in standard format (default option)");
       System.err.println("\t\t[-format crawldb]\tdump as CrawlDB");
+      System.err.println("\t\t[-format json]\tdump in JSON Lines format");
       System.err.println("\t\t[-regex <expr>]\tfilter records with expression");
       System.err.println("\t\t[-retry <num>]\tminimum retry count");
-      System.err
-          .println("\t\t[-status <status>]\tfilter records by CrawlDatum status");
-      System.err.println("\t\t[-expr <expr>]\tJexl expression to evaluate for this record");
-      System.err.println("\t\t[-sample <fraction>]\tOnly process a random sample with this ratio");
+      System.err.println(
+          "\t\t[-status <status>]\tfilter records by CrawlDatum status");
+      System.err.println(
+          "\t\t[-expr <expr>]\tJexl expression to evaluate for this record");
+      System.err.println(
+          "\t\t[-sample <fraction>]\tOnly process a random sample with this ratio");
       System.err
           .println("\t-url <url>\tprint information on <url> to System.out");
-      System.err
-          .println("\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>");
+      System.err.println(
+          "\t-topN <nnnn> <out_dir> [<min>]\tdump top <nnnn> urls sorted by score to <out_dir>");
       System.err
           .println("\t\t[<min>]\tskip records with scores below this value.");
       System.err.println("\t\t\tThis can significantly improve performance.");
@@ -954,14 +1054,15 @@
           }
           if (args[j].equals("-expr")) {
             expr = args[++j];
-            i=i+2;
+            i = i + 2;
           }
           if (args[j].equals("-sample")) {
             sample = Float.parseFloat(args[++j]);
             i = i + 2;
           }
         }
-        dbr.processDumpJob(crawlDb, param, config, format, regex, status, retry, expr, sample);
+        dbr.processDumpJob(crawlDb, param, config, format, regex, status, retry,
+            expr, sample);
       } else if (args[i].equals("-url")) {
         param = args[++i];
         StringBuilder output = new StringBuilder();
@@ -990,25 +1091,27 @@
     }
     return 0;
   }
-  
+
   public static void main(String[] args) throws Exception {
     int result = ToolRunner.run(NutchConfiguration.create(),
         new CrawlDbReader(), args);
     System.exit(result);
   }
 
-  public Object query(Map<String, String> args, Configuration conf, String type, String crawlId) throws Exception {
+  public Object query(Map<String, String> args, Configuration conf, String type,
+      String crawlId) throws Exception {
 
     Map<String, Object> results = new HashMap<>();
     String crawlDb = crawlId + "/crawldb";
 
-    if(type.equalsIgnoreCase("stats")){
+    if (type.equalsIgnoreCase("stats")) {
       boolean sort = false;
-      if(args.containsKey("sort")){
-        if(args.get("sort").equalsIgnoreCase("true"))
+      if (args.containsKey("sort")) {
+        if (args.get("sort").equalsIgnoreCase("true"))
           sort = true;
       }
-      TreeMap<String , Writable> stats = processStatJobHelper(crawlDb, NutchConfiguration.create(), sort);
+      TreeMap<String, Writable> stats = processStatJobHelper(crawlDb,
+          NutchConfiguration.create(), sort);
       LongWritable totalCnt = (LongWritable) stats.get("T");
       stats.remove("T");
       results.put("totalUrls", String.valueOf(totalCnt.get()));
@@ -1034,14 +1137,15 @@
         } else if (k.startsWith("status")) {
           String[] st = k.split(" ");
           int code = Integer.parseInt(st[1]);
-          if (st.length > 2){
+          if (st.length > 2) {
             @SuppressWarnings("unchecked")
-            Map<String, Object> individualStatusInfo = (Map<String, Object>) statusMap.get(String.valueOf(code));
+            Map<String, Object> individualStatusInfo = (Map<String, Object>) statusMap
+                .get(String.valueOf(code));
             Map<String, String> hostValues;
-            if(individualStatusInfo.containsKey("hostValues")){
-              hostValues= (Map<String, String>) individualStatusInfo.get("hostValues");
-            }
-            else{
+            if (individualStatusInfo.containsKey("hostValues")) {
+              hostValues = (Map<String, String>) individualStatusInfo
+                  .get("hostValues");
+            } else {
               hostValues = new HashMap<>();
               individualStatusInfo.put("hostValues", hostValues);
             }
@@ -1049,7 +1153,8 @@
           } else {
             Map<String, Object> individualStatusInfo = new HashMap<>();
 
-            individualStatusInfo.put("statusValue", CrawlDatum.getStatusName((byte) code));
+            individualStatusInfo.put("statusValue",
+                CrawlDatum.getStatusName((byte) code));
             individualStatusInfo.put("count", String.valueOf(val));
 
             statusMap.put(String.valueOf(code), individualStatusInfo);
@@ -1061,7 +1166,7 @@
       results.put("status", statusMap);
       return results;
     }
-    if(type.equalsIgnoreCase("dump")){
+    if (type.equalsIgnoreCase("dump")) {
       String output = args.get("out_dir");
       String format = "normal";
       String regex = null;
@@ -1085,25 +1190,26 @@
         expr = args.get("expr");
       }
       if (args.containsKey("sample")) {
-    	  sample = Float.parseFloat(args.get("sample"));
-        }
-      processDumpJob(crawlDb, output, conf, format, regex, status, retry, expr, sample);
-      File dumpFile = new File(output+"/part-00000");
-      return dumpFile;		  
+        sample = Float.parseFloat(args.get("sample"));
+      }
+      processDumpJob(crawlDb, output, conf, format, regex, status, retry, expr,
+          sample);
+      File dumpFile = new File(output + "/part-00000");
+      return dumpFile;
     }
     if (type.equalsIgnoreCase("topN")) {
       String output = args.get("out_dir");
       long topN = Long.parseLong(args.get("nnn"));
       float min = 0.0f;
-      if(args.containsKey("min")){
+      if (args.containsKey("min")) {
         min = Float.parseFloat(args.get("min"));
       }
       processTopNJob(crawlDb, topN, min, output, conf);
-      File dumpFile = new File(output+"/part-00000");
+      File dumpFile = new File(output + "/part-00000");
       return dumpFile;
     }
 
-    if(type.equalsIgnoreCase("url")){
+    if (type.equalsIgnoreCase("url")) {
       String url = args.get("url");
       CrawlDatum res = get(crawlDb, url, conf);
       results.put("status", res.getStatus());
@@ -1114,9 +1220,10 @@
       results.put("score", res.getScore());
       results.put("signature", StringUtil.toHexString(res.getSignature()));
       Map<String, String> metadata = new HashMap<>();
-      if(res.getMetaData()!=null){
+      if (res.getMetaData() != null) {
         for (Entry<Writable, Writable> e : res.getMetaData().entrySet()) {
-          metadata.put(String.valueOf(e.getKey()), String.valueOf(e.getValue()));
+          metadata.put(String.valueOf(e.getKey()),
+              String.valueOf(e.getValue()));
         }
       }
       results.put("metadata", metadata);