Created Accumulo/Spark example (#39)


diff --git a/README.md b/README.md
index 77c91bc..0be1400 100644
--- a/README.md
+++ b/README.md
@@ -83,6 +83,7 @@
 | [rowhash] | Using MapReduce to read a table and write to a new column in the same table. |
 | [sample] | Building and using sample data in Accumulo. |
 | [shard] | Using the intersecting iterator with a term index partitioned by document. |
+| [spark] | Using Accumulo as input and output for Apache Spark jobs |
 | [tabletofile] | Using MapReduce to read a table and write one of its columns to a file in HDFS. |
 | [terasort] | Generating random data and sorting it using Accumulo. |
 | [uniquecols] | Use MapReduce to count unique columns in Accumulo |
@@ -120,6 +121,7 @@
 [rowhash]: docs/rowhash.md
 [sample]: docs/sample.md
 [shard]: docs/shard.md
+[spark]: spark/README.md
 [tabletofile]: docs/tabletofile.md
 [terasort]: docs/terasort.md
 [uniquecols]: docs/uniquecols.md
diff --git a/spark/.gitignore b/spark/.gitignore
new file mode 100644
index 0000000..f534230
--- /dev/null
+++ b/spark/.gitignore
@@ -0,0 +1,6 @@
+/.classpath
+/.project
+/.settings/
+/target/
+/*.iml
+/.idea
diff --git a/spark/README.md b/spark/README.md
new file mode 100644
index 0000000..af19029
--- /dev/null
+++ b/spark/README.md
@@ -0,0 +1,45 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Apache Accumulo Spark Example
+
+## Requirements
+
+* Accumulo 2.0+
+* Hadoop YARN installed & `HADOOP_CONF_DIR` set in environment
+* Spark installed & `SPARK_HOME` set in environment
+
+## Spark example
+
+The [CopyPlus5K] example will create an Accumulo table called `spark_example_input`
+and write 100 key/value entries into Accumulo with the values `0..99`. It then launches
+a Spark application that does following:
+
+* Read data from `spark_example_input` table using `AccumuloInputFormat`
+* Add 5000 to each value
+* Write the data to a new Accumulo table (called `spark_example_output`) using one of
+  two methods.
+  1. **Bulk import** - Write data to an RFile in HDFS using `AccumuloFileOutputFormat` and
+     bulk import to Accumulo table
+  2. **Batchwriter** - Creates a `BatchWriter` in Spark code to write to the table. 
+
+This application can be run using the command:
+
+    ./run.sh batch /path/to/accumulo-client.properties
+
+Change `batch` to `bulk` to use Bulk import method.
+
+[CopyPlus5K]: src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
diff --git a/spark/pom.xml b/spark/pom.xml
new file mode 100644
index 0000000..67f5de2
--- /dev/null
+++ b/spark/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache</groupId>
+    <artifactId>apache</artifactId>
+    <version>21</version>
+  </parent>
+  <groupId>org.apache.accumulo</groupId>
+  <artifactId>accumulo-spark</artifactId>
+  <version>2.0.0-SNAPSHOT</version>
+  <name>Apache Accumulo Spark Example</name>
+  <description>Example Spark Application for Apache Accumulo</description>
+  <properties>
+    <accumulo.version>2.0.0-SNAPSHOT</accumulo.version>
+    <hadoop.version>3.2.0</hadoop.version>
+    <maven.compiler.source>1.8</maven.compiler.source>
+    <maven.compiler.target>1.8</maven.compiler.target>
+    <zookeeper.version>3.4.13</zookeeper.version>
+  </properties>
+  <dependencyManagement>
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.zookeeper</groupId>
+        <artifactId>zookeeper</artifactId>
+        <version>${zookeeper.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-core</artifactId>
+      <version>${accumulo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.accumulo</groupId>
+      <artifactId>accumulo-hadoop-mapreduce</artifactId>
+      <version>${accumulo.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_2.11</artifactId>
+      <version>2.4.0</version>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>create-shade-jar</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>spark-shade-jar</id>
+                <goals>
+                  <goal>shade</goal>
+                </goals>
+                <phase>package</phase>
+                <configuration>
+                  <finalName>${project.artifactId}-shaded</finalName>
+                  <shadedArtifactAttached>true</shadedArtifactAttached>
+                  <shadedClassifierName>shaded</shadedClassifierName>
+                  <artifactSet>
+                    <excludes>
+                      <exclude>org.apache.accumulo:accumulo-native</exclude>
+                      <exclude>org.apache.hadoop:*</exclude>
+                      <exclude>org.apache.spark:*</exclude>
+                    </excludes>
+                  </artifactSet>
+                  <relocations>
+                    <relocation>
+                      <!-- Required as Accumulo uses a different version than Hadoop --> 
+                      <pattern>com.google.common</pattern>
+                      <shadedPattern>shaded.com.google.common</shadedPattern>
+                    </relocation>
+                  </relocations>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+</project>
diff --git a/spark/run.sh b/spark/run.sh
new file mode 100755
index 0000000..e1ab9c0
--- /dev/null
+++ b/spark/run.sh
@@ -0,0 +1,28 @@
+#!/bin/bash
+
+if [[ -z "$1" || -z "$2" ]]; then
+  echo "Usage: ./run.sh [bulk|batch] /path/to/accumulo-client.properties"
+  exit 1
+fi
+
+JAR=./target/accumulo-spark-shaded.jar
+if [[ ! -f $JAR ]]; then
+  mvn clean package -P create-shade-jar
+fi
+
+if [[ -z "$SPARK_HOME" ]]; then
+  echo "SPARK_HOME must be set!"
+  exit 1
+fi
+
+if [[ -z "$HADOOP_CONF_DIR" ]]; then
+  echo "HADOOP_CONF_DIR must be set!"
+  exit 1
+fi
+
+"$SPARK_HOME"/bin/spark-submit \
+  --class org.apache.accumulo.spark.CopyPlus5K \
+  --master yarn \
+  --deploy-mode client \
+  $JAR \
+  $1 $2
diff --git a/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
new file mode 100644
index 0000000..4443a70
--- /dev/null
+++ b/spark/src/main/java/org/apache/accumulo/spark/CopyPlus5K.java
@@ -0,0 +1,157 @@
+package org.apache.accumulo.spark;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
+import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.spark.Partitioner;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class CopyPlus5K {
+
+  public static class AccumuloRangePartitioner extends Partitioner {
+
+    private static final long serialVersionUID = 1L;
+    private List<String> splits;
+
+    AccumuloRangePartitioner(String... listSplits) {
+      this.splits = Arrays.asList(listSplits);
+    }
+
+    @Override
+    public int getPartition(Object o) {
+      int index = Collections.binarySearch(splits, ((Key)o).getRow().toString());
+      index = index < 0 ? (index + 1) * -1 : index;
+      return index;
+    }
+
+    @Override
+    public int numPartitions() {
+      return splits.size() + 1;
+    }
+  }
+
+  private static void cleanupAndCreateTables(Properties props) throws Exception {
+    FileSystem hdfs = FileSystem.get(new Configuration());
+    if (hdfs.exists(rootPath)) {
+      hdfs.delete(rootPath, true);
+    }
+    try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+      if (client.tableOperations().exists(inputTable)) {
+        client.tableOperations().delete(inputTable);
+      }
+      if (client.tableOperations().exists(outputTable)) {
+        client.tableOperations().delete(outputTable);
+      }
+      // Create tables
+      client.tableOperations().create(inputTable);
+      client.tableOperations().create(outputTable);
+
+      // Write data to input table
+      try (BatchWriter bw = client.createBatchWriter(inputTable)) {
+        for (int i = 0; i < 100; i++) {
+          Mutation m = new Mutation(String.format("%03d", i));
+          m.at().family("cf1").qualifier("cq1").put("" + i);
+          bw.addMutation(m);
+        }
+      }
+    }
+  }
+
+  private static final String inputTable = "spark_example_input";
+  private static final String outputTable = "spark_example_output";
+  private static final Path rootPath = new Path("/spark_example/");
+
+  public static void main(String[] args) throws Exception {
+
+    if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) {
+        System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties");
+        System.exit(1);
+    }
+
+    // Read client properties from file
+    final Properties props = Accumulo.newClientProperties().from(args[1]).build();
+
+    cleanupAndCreateTables(props);
+
+    SparkConf conf = new SparkConf();
+    conf.setAppName("CopyPlus5K");
+    // KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
+    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+    conf.registerKryoClasses(new Class[]{Key.class, Value.class, Properties.class});
+
+    JavaSparkContext sc = new JavaSparkContext(conf);
+
+    Job job = Job.getInstance();
+
+    // Read input from Accumulo
+    AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
+    JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
+        AccumuloInputFormat.class, Key.class, Value.class);
+
+    // Add 5K to all values
+    JavaPairRDD<Key, Value> dataPlus5K = data.mapValues(v ->
+        new Value("" + (Integer.parseInt(v.toString()) + 5_000)));
+
+    if (args[0].equals("batch")) {
+      // Write output using batch writer
+      dataPlus5K.foreachPartition(iter -> {
+        // Intentionally created an Accumulo client for each partition to avoid attempting to
+        // serialize it and send it to each remote process.
+        try (AccumuloClient client = Accumulo.newClient().from(props).build();
+             BatchWriter bw = client.createBatchWriter(outputTable)) {
+          iter.forEachRemaining(kv -> {
+            Key key = kv._1;
+            Value val = kv._2;
+            Mutation m = new Mutation(key.getRow());
+            m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
+                .visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
+            try {
+              bw.addMutation(m);
+            } catch (MutationsRejectedException e) {
+              e.printStackTrace();
+            }
+          });
+        }
+      });
+    } else if (args[0].equals("bulk")) {
+      // Write output using bulk import
+
+      // Create HDFS directory for bulk import
+      FileSystem hdfs = FileSystem.get(new Configuration());
+      hdfs.mkdirs(rootPath);
+      Path outputDir = new Path(rootPath.toString() + "/output");
+
+      // Write Spark output to HDFS
+      AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
+      Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
+      JavaPairRDD<Key, Value> partData = dataPlus5K.repartitionAndSortWithinPartitions(partitioner);
+      partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
+          AccumuloFileOutputFormat.class);
+
+      // Bulk import into Accumulo
+      try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
+        client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
+      }
+    } else {
+      System.out.println("Unknown method to write output: " + args[0]);
+      System.exit(1);
+    }
+  }
+}