blob: 08ddae03b8b6983c359d8911cddb38ef6c94e862 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.mapreduce.FluoEntryInputFormat;
import org.apache.fluo.mapreduce.FluoKeyValue;
import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Partitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
* Helper methods for using Fluo with Spark
* @since 1.0.0
public class FluoSparkHelper {
private static final Logger log = LoggerFactory.getLogger(FluoSparkHelper.class);
private static AtomicInteger tempDirCounter = new AtomicInteger(0);
private FluoConfiguration fluoConfig;
private Configuration hadoopConfig;
private Path tempBaseDir;
private FileSystem hdfs;
// @formatter:off
public FluoSparkHelper(FluoConfiguration fluoConfig, Configuration hadoopConfig,
Path tempBaseDir) {
// @formatter:on
this.fluoConfig = fluoConfig;
this.hadoopConfig = hadoopConfig;
this.tempBaseDir = tempBaseDir;
this.fluoConfig = fluoConfig;
try {
hdfs = FileSystem.get(hadoopConfig);
} catch (IOException e) {
throw new IllegalStateException("Unable to get HDFS client from hadoop config", e);
* Converts RowColumnValue RDD to RowColumn/Bytes PairRDD
* @param rcvRDD RowColumnValue RDD to convert
* @return RowColumn/Bytes PairRDD
public static JavaPairRDD<RowColumn, Bytes> toPairRDD(JavaRDD<RowColumnValue> rcvRDD) {
return rcvRDD.mapToPair(rcv -> new Tuple2<>(rcv.getRowColumn(), rcv.getValue()));
* Converts RowColumn/Bytes PairRDD to RowColumnValue RDD
* @param pairRDD RowColumn/Bytes PairRDD
* @return RowColumnValue RDD
public static JavaRDD<RowColumnValue> toRcvRDD(JavaPairRDD<RowColumn, Bytes> pairRDD) {
return -> new RowColumnValue(t._1().getRow(), t._1().getColumn(), t._2()));
private static AccumuloClient getAccumuloClient(FluoConfiguration config) {
return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
.as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
* Reads all data from a snapshot in Fluo and returns it as a RowColumn/Value RDD.
* @param ctx Java Spark context
* @return RowColumn/Value RDD containing all data in Fluo
public JavaPairRDD<RowColumn, Bytes> readFromFluo(JavaSparkContext ctx) {
Job job;
try {
job = Job.getInstance(hadoopConfig);
} catch (IOException e) {
throw new IllegalStateException(e);
FluoEntryInputFormat.configure(job, fluoConfig);
return ctx.newAPIHadoopRDD(job.getConfiguration(), FluoEntryInputFormat.class, RowColumn.class,
* Bulk import RowColumn/Value data into Fluo table (obtained from Fluo configuration). This
* method will repartition RDD using the current split points of the Fluo table, creating one
* partition per tablet in the table. This is done so that one RFile is created per tablet for
* bulk import.
* @param data RowColumn/Value data to import
* @param opts Bulk import options
public void bulkImportRcvToFluo(JavaPairRDD<RowColumn, Bytes> data, BulkImportOptions opts) {
data = partitionForAccumulo(data, fluoConfig.getAccumuloTable(), opts);
JavaPairRDD<Key, Value> kvData = data.flatMapToPair(tuple -> {
List<Tuple2<Key, Value>> output = new LinkedList<>();
RowColumn rc = tuple._1();
FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
for (FluoKeyValue kv : fkvg.getKeyValues()) {
output.add(new Tuple2<>(kv.getKey(), kv.getValue()));
return output.iterator();
bulkImportKvToAccumulo(kvData, fluoConfig.getAccumuloTable(), opts);
* Bulk import Key/Value data into into Fluo table (obtained from Fluo configuration). This method
* does not repartition data. One RFile will be created for each partition in the passed in RDD.
* Ensure the RDD is reasonably partitioned before calling this method.
* @param data Key/Value data to import
* @param opts Bulk import options
public void bulkImportKvToFluo(JavaPairRDD<Key, Value> data, BulkImportOptions opts) {
bulkImportKvToAccumulo(data, fluoConfig.getAccumuloTable(), opts);
* Bulk import RowColumn/Value data into specified Accumulo table. This method will repartition
* RDD using the current split points of the specified table, creating one partition per tablet in
* the table. This is done so that one RFile is created per tablet for bulk import.
* @param data RowColumn/Value data to import
* @param accumuloTable Accumulo table used for import
* @param opts Bulk import options
public void bulkImportRcvToAccumulo(JavaPairRDD<RowColumn, Bytes> data, String accumuloTable,
BulkImportOptions opts) {
data = partitionForAccumulo(data, accumuloTable, opts);
JavaPairRDD<Key, Value> kvData = data.mapToPair(tuple -> {
RowColumn rc = tuple._1();
byte[] row = rc.getRow().toArray();
byte[] cf = rc.getColumn().getFamily().toArray();
byte[] cq = rc.getColumn().getQualifier().toArray();
byte[] val = tuple._2().toArray();
return new Tuple2<>(new Key(new Text(row), new Text(cf), new Text(cq), 0), new Value(val));
bulkImportKvToAccumulo(kvData, accumuloTable, opts);
* Bulk import Key/Value data into specified Accumulo table. This method does not repartition
* data. One RFile will be created for each partition in the passed in RDD. Ensure the RDD is
* reasonably partitioned before calling this method.
* @param data Key/value data to import
* @param accumuloTable Accumulo table used for import
* @param opts Bulk import options
public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> data, String accumuloTable,
BulkImportOptions opts) {
Path tempDir = getTempDir(opts);
try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
if (hdfs.exists(tempDir)) {
throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
Path dataDir = new Path(tempDir.toString() + "/data");
Path failDir = new Path(tempDir.toString() + "/fail");
// save data to HDFS
Job job = Job.getInstance(hadoopConfig);
AccumuloFileOutputFormat.setOutputPath(job, dataDir);
// must use new API here as saveAsHadoopFile throws exception
data.saveAsNewAPIHadoopFile(dataDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, job.getConfiguration());
// bulk import data to Accumulo"Wrote data for bulk import to HDFS temp directory: {}", dataDir);
Connector conn = chooseConnector(client, opts);
conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(),
// throw exception if failures directory contains files
if (hdfs.listFiles(failDir, true).hasNext()) {
throw new IllegalStateException("Bulk import failed! Found files that failed to import "
+ "in failures directory: " + failDir);
}"Successfully bulk imported data in {} to '{}' Accumulo table", dataDir,
// delete data directory
hdfs.delete(tempDir, true);"Deleted HDFS temp directory created for bulk import: {}", tempDir);
// @formatter:off
} catch (IOException | TableNotFoundException | AccumuloException
| AccumuloSecurityException e) {
// @formatter:on
throw new IllegalStateException(e);
* Optional settings for Bulk Imports
* @since 1.0.0
public static class BulkImportOptions {
public static BulkImportOptions DEFAULT = new BulkImportOptions();
Connector conn = null;
Path tempDir = null;
public BulkImportOptions() {}
* If this methods is not called, then a Connector will be created using properties in the
* FluoConfiguration supplied to
* {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
* @param conn Use this connector to bulk import files into Accumulo.
* @return this
* @deprecated use {@link #setAccumuloClient(AccumuloClient)}
@Deprecated(since = "1.3.0", forRemoval = true)
public BulkImportOptions setAccumuloConnector(Connector conn) {
this.conn = conn;
return this;
* If this methods is not called, then a Client will be created using properties in the
* FluoConfiguration supplied to
* {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
* @param conn Use this connector to bulk import files into Accumulo.
* @return this
* @since 1.3.0
public BulkImportOptions setAccumuloClient(AccumuloClient client) {
try {
this.conn = Connector.from(client);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
return this;
* If this method is not called, then a temp dir will be created based on the path passed
* supplied to {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
* @param tempDir Use this directory to store RFiles generated for bulk import.
* @return this
public BulkImportOptions setTempDir(Path tempDir) {
this.tempDir = tempDir;
return this;
private Path getPossibleTempDir() {
return new Path(tempBaseDir.toString() + "/" + tempDirCounter.getAndIncrement());
private Path getTempDir(BulkImportOptions opts) {
Path tempDir;
if (opts.tempDir == null) {
try {
tempDir = getPossibleTempDir();
while (hdfs.exists(tempDir)) {
tempDir = getPossibleTempDir();
} catch (IOException e) {
throw new IllegalStateException(e);
} else {
tempDir = opts.tempDir;
return tempDir;
private JavaPairRDD<RowColumn, Bytes> partitionForAccumulo(JavaPairRDD<RowColumn, Bytes> data,
String accumuloTable, BulkImportOptions opts) {
// partition and sort data so that one file is created per an accumulo tablet
Partitioner accumuloPartitioner;
try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
accumuloPartitioner = new AccumuloRangePartitioner(
chooseConnector(client, opts).tableOperations().listSplits(accumuloTable));
} catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
throw new IllegalStateException(e);
return data.repartitionAndSortWithinPartitions(accumuloPartitioner);
private Connector chooseConnector(AccumuloClient client, BulkImportOptions opts) {
try {
return opts.conn == null ? Connector.from(client) : opts.conn;
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);