blob: e68762a6cb54c467f628129ebd5bf3c215c24ab5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.spark;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.hadoop.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.hadoop.mapreduce.AccumuloInputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CopyPlus5K {
private static final Logger log = LoggerFactory.getLogger(CopyPlus5K.class);
public static class AccumuloRangePartitioner extends Partitioner {
private static final long serialVersionUID = 1L;
private List<String> splits;
AccumuloRangePartitioner(String... listSplits) {
this.splits = Arrays.asList(listSplits);
}
@Override
public int getPartition(Object o) {
int index = Collections.binarySearch(splits, ((Key) o).getRow().toString());
index = index < 0 ? (index + 1) * -1 : index;
return index;
}
@Override
public int numPartitions() {
return splits.size() + 1;
}
}
private static void cleanupAndCreateTables(Properties props) throws Exception {
FileSystem hdfs = FileSystem.get(new Configuration());
if (hdfs.exists(rootPath)) {
hdfs.delete(rootPath, true);
}
try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
if (client.tableOperations().exists(inputTable)) {
client.tableOperations().delete(inputTable);
}
if (client.tableOperations().exists(outputTable)) {
client.tableOperations().delete(outputTable);
}
// Create tables
try {
client.tableOperations().create(inputTable);
} catch (TableExistsException e) {
log.error("Something went wrong. Table '{}' should have been deleted prior to creation "
+ "attempt!", inputTable);
return;
}
try {
client.tableOperations().create(outputTable);
} catch (TableExistsException e) {
log.error("Something went wrong. Table '{}' should have been deleted prior to creation "
+ "attempt!", inputTable);
return;
}
// Write data to input table
try (BatchWriter bw = client.createBatchWriter(inputTable)) {
for (int i = 0; i < 100; i++) {
Mutation m = new Mutation(String.format("%03d", i));
m.at().family("cf1").qualifier("cq1").put("" + i);
bw.addMutation(m);
}
}
}
}
private static final String inputTable = "spark_example_input";
private static final String outputTable = "spark_example_output";
private static final Path rootPath = new Path("/spark_example/");
public static void main(String[] args) throws Exception {
if ((!args[0].equals("batch") && !args[0].equals("bulk")) || args[1].isEmpty()) {
System.out.println("Usage: ./run.sh [batch|bulk] /path/to/accumulo-client.properties");
System.exit(1);
}
// Read client properties from file
final Properties props = Accumulo.newClientProperties().from(args[1]).build();
cleanupAndCreateTables(props);
SparkConf conf = new SparkConf();
conf.setAppName("CopyPlus5K");
// KryoSerializer is needed for serializing Accumulo Key when partitioning data for bulk import
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
conf.registerKryoClasses(new Class[] {Key.class, Value.class, Properties.class});
try (JavaSparkContext sc = new JavaSparkContext(conf)) {
Job job = Job.getInstance();
// Read input from Accumulo
AccumuloInputFormat.configure().clientProperties(props).table(inputTable).store(job);
JavaPairRDD<Key,Value> data = sc.newAPIHadoopRDD(job.getConfiguration(),
AccumuloInputFormat.class, Key.class, Value.class);
// Add 5K to all values
JavaPairRDD<Key,Value> dataPlus5K = data
.mapValues(v -> new Value("" + (Integer.parseInt(v.toString()) + 5_000)));
if (args[0].equals("batch")) {
// Write output using batch writer
dataPlus5K.foreachPartition(iter -> {
// Intentionally created an Accumulo client for each partition to avoid attempting to
// serialize it and send it to each remote process.
try (AccumuloClient client = Accumulo.newClient().from(props).build();
BatchWriter bw = client.createBatchWriter(outputTable)) {
iter.forEachRemaining(kv -> {
Key key = kv._1;
Value val = kv._2;
Mutation m = new Mutation(key.getRow());
m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
.visibility(key.getColumnVisibility()).timestamp(key.getTimestamp()).put(val);
try {
bw.addMutation(m);
} catch (MutationsRejectedException e) {
e.printStackTrace();
}
});
}
});
} else if (args[0].equals("bulk")) {
// Write output using bulk import
// Create HDFS directory for bulk import
FileSystem hdfs = FileSystem.get(new Configuration());
hdfs.mkdirs(rootPath);
Path outputDir = new Path(rootPath.toString() + "/output");
// Write Spark output to HDFS
AccumuloFileOutputFormat.configure().outputPath(outputDir).store(job);
Partitioner partitioner = new AccumuloRangePartitioner("3", "7");
JavaPairRDD<Key,Value> partData = dataPlus5K
.repartitionAndSortWithinPartitions(partitioner);
partData.saveAsNewAPIHadoopFile(outputDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class);
// Bulk import into Accumulo
try (AccumuloClient client = Accumulo.newClient().from(props).build()) {
client.tableOperations().importDirectory(outputDir.toString()).to(outputTable).load();
}
} else {
System.out.println("Unknown method to write output: " + args[0]);
System.exit(1);
}
}
}
}