Merge pull request #63 from jmark99/bingest

Update the bulkIngest example to work correctly with Accumulo 2.0.x. Minor updates made to the documentation. The classes were refactored so the example will work correctly with 2.0 changes. Generation of test data was moved from SetupTable to BulkIngestExample (as in 1.10.x version). Prior to change, necessary data was not correctly being written to HDFS thereby preventing the BulkIngestExample class from finding information needed to generate the data. Closes #63.
diff --git a/docs/bulkIngest.md b/docs/bulkIngest.md
index d581b41..6edee37 100644
--- a/docs/bulkIngest.md
+++ b/docs/bulkIngest.md
@@ -16,18 +16,14 @@
 -->
 # Apache Accumulo Bulk Ingest Example
 
-This is an example of how to bulk ingest data into Accumulo using map reduce.
+This is an example of how to bulk ingest data into Accumulo using mapReduce.
 
 This tutorial uses the following Java classes.
 
- * [SetupTable.java] - creates the table and some data to ingest
- * [BulkIngestExample.java] - ingest the data using map reduce
+ * [SetupTable.java] - creates the table, 'test_bulk', and sets two split points.
+ * [BulkIngestExample.java] - creates some data to ingest and then ingests the data using mapReduce
  * [VerifyIngest.java] - checks that the data was ingested
  
-Remember to copy the accumulo-examples\*.jar to Accumulo's 'lib/ext' directory.
-
-    $ cp target/accumulo-examples*.jar /path/accumulo/lib/ext
-
 The following commands show how to run this example. This example creates a
 table called test_bulk which has two initial split points. Then 1000 rows of
 test data are created in HDFS. After that the 1000 rows are ingested into
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
index 835b3ab..0791265 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/BulkIngestExample.java
@@ -42,13 +42,21 @@
  * Example map reduce job that bulk ingest data into an accumulo table. The expected input is text
  * files containing tab separated key value pairs on each line.
  */
-public class BulkIngestExample {
-  static String workDir = "tmp/bulkWork";
-  static String inputDir = "bulk";
+public final class BulkIngestExample {
+  static final String workDir = "tmp/bulkWork";
+  static final String inputDir = "bulk";
+  static final String outputFile = "bulk/test_1.txt";
+  static final int numRows = 1000;
+
+  static final String SLASH_FILES = "/files";
+  static final String FAILURES = "failures";
+  static final String SPLITS_TXT = "/splits.txt";
+
+  private BulkIngestExample() {}
 
   public static class MapClass extends Mapper<LongWritable,Text,Text,Text> {
-    private Text outputKey = new Text();
-    private Text outputValue = new Text();
+    private final Text outputKey = new Text();
+    private final Text outputValue = new Text();
 
     @Override
     public void map(LongWritable key, Text value, Context output)
@@ -94,10 +102,25 @@
     }
   }
 
-  public static void main(String[] args) throws Exception {
+  public static int main(String[] args) throws Exception {
     ClientOpts opts = new ClientOpts();
     opts.parseArgs(BulkIngestExample.class.getName(), args);
+    FileSystem fs = FileSystem.get(opts.getHadoopConfig());
 
+    generateTestData(fs);
+    return ingestTestData(fs, opts);
+  }
+
+  private static void generateTestData(FileSystem fs) throws IOException {
+    try (PrintStream out = new PrintStream(
+        new BufferedOutputStream(fs.create(new Path(outputFile))))) {
+      for (int i = 0; i < numRows; i++) {
+        out.printf("row_%010d\tvalue_%010d%n", i, i);
+      }
+    }
+  }
+
+  private static int ingestTestData(FileSystem fs, ClientOpts opts) throws Exception {
     Job job = Job.getInstance(opts.getHadoopConfig());
     job.setJobName(BulkIngestExample.class.getSimpleName());
     job.setJarByClass(BulkIngestExample.class);
@@ -112,13 +135,12 @@
     job.setOutputFormatClass(AccumuloFileOutputFormat.class);
 
     TextInputFormat.setInputPaths(job, new Path(inputDir));
-    AccumuloFileOutputFormat.configure().outputPath(new Path(workDir + "/files")).store(job);
+    AccumuloFileOutputFormat.configure().outputPath(new Path(workDir + SLASH_FILES)).store(job);
 
-    FileSystem fs = FileSystem.get(opts.getHadoopConfig());
     try (AccumuloClient client = opts.createAccumuloClient()) {
 
       try (PrintStream out = new PrintStream(
-          new BufferedOutputStream(fs.create(new Path(workDir + "/splits.txt"))))) {
+          new BufferedOutputStream(fs.create(new Path(workDir + SPLITS_TXT))))) {
         Collection<Text> splits = client.tableOperations().listSplits(SetupTable.tableName, 100);
         for (Text split : splits)
           out.println(Base64.getEncoder().encodeToString(split.copyBytes()));
@@ -126,17 +148,18 @@
       }
 
       job.setPartitionerClass(RangePartitioner.class);
-      RangePartitioner.setSplitFile(job, workDir + "/splits.txt");
+      RangePartitioner.setSplitFile(job, workDir + SPLITS_TXT);
 
       job.waitForCompletion(true);
-      Path failures = new Path(workDir, "failures");
+      Path failures = new Path(workDir, FAILURES);
       fs.delete(failures, true);
-      fs.mkdirs(new Path(workDir, "failures"));
+      fs.mkdirs(new Path(workDir, FAILURES));
       // With HDFS permissions on, we need to make sure the Accumulo user can read/move the rfiles
       FsShell fsShell = new FsShell(opts.getHadoopConfig());
       fsShell.run(new String[] {"-chmod", "-R", "777", workDir});
-      client.tableOperations().importDirectory(workDir + "/files").to(SetupTable.tableName).load();
+      client.tableOperations().importDirectory(workDir + SLASH_FILES).to(SetupTable.tableName)
+          .load();
     }
-    System.exit(job.isSuccessful() ? 0 : 1);
+    return job.isSuccessful() ? 0 : 1;
   }
 }
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
index a917783..ef4edb3 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/SetupTable.java
@@ -16,25 +16,20 @@
  */
 package org.apache.accumulo.examples.mapreduce.bulk;
 
-import java.io.BufferedOutputStream;
-import java.io.PrintStream;
 import java.util.TreeSet;
 
 import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.TableExistsException;
 import org.apache.accumulo.examples.cli.ClientOpts;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 
-public class SetupTable {
+public final class SetupTable {
 
-  static String[] splits = {"row_00000333", "row_00000666"};
-  static String tableName = "test_bulk";
-  static int numRows = 1000;
-  static String outputFile = "bulk/test_1.txt";
+  static final String[] splits = {"row_00000333", "row_00000666"};
+  static final String tableName = "test_bulk";
+
+  private SetupTable() {}
 
   public static void main(String[] args) throws Exception {
     ClientOpts opts = new ClientOpts();
@@ -48,20 +43,11 @@
       }
 
       // create a table with initial partitions
-      TreeSet<Text> intialPartitions = new TreeSet<>();
+      TreeSet<Text> initialPartitions = new TreeSet<>();
       for (String split : splits) {
-        intialPartitions.add(new Text(split));
+        initialPartitions.add(new Text(split));
       }
-      client.tableOperations().addSplits(tableName, intialPartitions);
-
-      FileSystem fs = FileSystem.get(new Configuration());
-      try (PrintStream out = new PrintStream(
-          new BufferedOutputStream(fs.create(new Path(outputFile))))) {
-        // create some data in outputFile
-        for (int i = 0; i < numRows; i++) {
-          out.println(String.format("row_%010d\tvalue_%010d", i, i));
-        }
-      }
+      client.tableOperations().addSplits(tableName, initialPartitions);
     }
   }
 }
diff --git a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
index 91a3468..dc354b2 100644
--- a/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
+++ b/src/main/java/org/apache/accumulo/examples/mapreduce/bulk/VerifyIngest.java
@@ -31,9 +31,13 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class VerifyIngest {
+public final class VerifyIngest {
 
   private static final Logger log = LoggerFactory.getLogger(VerifyIngest.class);
+  private static final String ROW_FORMAT = "row_%010d";
+  private static final String VALUE_FORMAT = "value_%010d";
+
+  private VerifyIngest() {}
 
   public static void main(String[] args) throws TableNotFoundException {
 
@@ -43,31 +47,31 @@
     try (AccumuloClient client = Accumulo.newClient().from(opts.getClientPropsPath()).build();
         Scanner scanner = client.createScanner(SetupTable.tableName, Authorizations.EMPTY)) {
 
-      scanner.setRange(new Range(String.format("row_%010d", 0), null));
+      scanner.setRange(new Range(String.format(ROW_FORMAT, 0), null));
 
       Iterator<Entry<Key,Value>> si = scanner.iterator();
 
       boolean ok = true;
 
-      for (int i = 0; i < SetupTable.numRows; i++) {
+      for (int i = 0; i < BulkIngestExample.numRows; i++) {
 
         if (si.hasNext()) {
           Entry<Key,Value> entry = si.next();
 
-          if (!entry.getKey().getRow().toString().equals(String.format("row_%010d", i))) {
-            log.error("unexpected row key " + entry.getKey().getRow().toString() + " expected "
-                + String.format("row_%010d", i));
+          if (!entry.getKey().getRow().toString().equals(String.format(ROW_FORMAT, i))) {
+            log.error("unexpected row key {}; expected {}", entry.getKey().getRow(),
+                String.format(ROW_FORMAT, i));
             ok = false;
           }
 
-          if (!entry.getValue().toString().equals(String.format("value_%010d", i))) {
-            log.error("unexpected value " + entry.getValue().toString() + " expected "
-                + String.format("value_%010d", i));
+          if (!entry.getValue().toString().equals(String.format(VALUE_FORMAT, i))) {
+            log.error("unexpected value {}; expected {}", entry.getValue(),
+                String.format(VALUE_FORMAT, i));
             ok = false;
           }
 
         } else {
-          log.error("no more rows, expected " + String.format("row_%010d", i));
+          log.error("no more rows, expected {}", String.format(ROW_FORMAT, i));
           ok = false;
           break;
         }
@@ -75,9 +79,10 @@
       }
 
       if (ok) {
-        System.out.println("OK");
+        System.out.println("Data verification succeeded!");
         System.exit(0);
       } else {
+        System.out.println("Data verification failed!");
         System.exit(1);
       }
     }