blob: 08ddae03b8b6983c359d8911cddb38ef6c94e862 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.fluo.recipes.spark;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.RowColumn;
import org.apache.fluo.api.data.RowColumnValue;
import org.apache.fluo.mapreduce.FluoEntryInputFormat;
import org.apache.fluo.mapreduce.FluoKeyValue;
import org.apache.fluo.mapreduce.FluoKeyValueGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Partitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
/**
* Helper methods for using Fluo with Spark
*
* @since 1.0.0
*/
public class FluoSparkHelper {
private static final Logger log = LoggerFactory.getLogger(FluoSparkHelper.class);
private static AtomicInteger tempDirCounter = new AtomicInteger(0);
private FluoConfiguration fluoConfig;
private Configuration hadoopConfig;
private Path tempBaseDir;
private FileSystem hdfs;
// @formatter:off
public FluoSparkHelper(FluoConfiguration fluoConfig, Configuration hadoopConfig,
Path tempBaseDir) {
// @formatter:on
this.fluoConfig = fluoConfig;
this.hadoopConfig = hadoopConfig;
this.tempBaseDir = tempBaseDir;
this.fluoConfig = fluoConfig;
try {
hdfs = FileSystem.get(hadoopConfig);
} catch (IOException e) {
throw new IllegalStateException("Unable to get HDFS client from hadoop config", e);
}
}
/**
* Converts RowColumnValue RDD to RowColumn/Bytes PairRDD
*
* @param rcvRDD RowColumnValue RDD to convert
* @return RowColumn/Bytes PairRDD
*/
public static JavaPairRDD<RowColumn, Bytes> toPairRDD(JavaRDD<RowColumnValue> rcvRDD) {
return rcvRDD.mapToPair(rcv -> new Tuple2<>(rcv.getRowColumn(), rcv.getValue()));
}
/**
* Converts RowColumn/Bytes PairRDD to RowColumnValue RDD
*
* @param pairRDD RowColumn/Bytes PairRDD
* @return RowColumnValue RDD
*/
public static JavaRDD<RowColumnValue> toRcvRDD(JavaPairRDD<RowColumn, Bytes> pairRDD) {
return pairRDD.map(t -> new RowColumnValue(t._1().getRow(), t._1().getColumn(), t._2()));
}
private static AccumuloClient getAccumuloClient(FluoConfiguration config) {
return Accumulo.newClient().to(config.getAccumuloInstance(), config.getAccumuloZookeepers())
.as(config.getAccumuloUser(), config.getAccumuloPassword()).build();
}
/**
* Reads all data from a snapshot in Fluo and returns it as a RowColumn/Value RDD.
*
* @param ctx Java Spark context
* @return RowColumn/Value RDD containing all data in Fluo
*/
public JavaPairRDD<RowColumn, Bytes> readFromFluo(JavaSparkContext ctx) {
Job job;
try {
job = Job.getInstance(hadoopConfig);
} catch (IOException e) {
throw new IllegalStateException(e);
}
FluoEntryInputFormat.configure(job, fluoConfig);
return ctx.newAPIHadoopRDD(job.getConfiguration(), FluoEntryInputFormat.class, RowColumn.class,
Bytes.class);
}
/**
* Bulk import RowColumn/Value data into Fluo table (obtained from Fluo configuration). This
* method will repartition RDD using the current split points of the Fluo table, creating one
* partition per tablet in the table. This is done so that one RFile is created per tablet for
* bulk import.
*
* @param data RowColumn/Value data to import
* @param opts Bulk import options
*/
public void bulkImportRcvToFluo(JavaPairRDD<RowColumn, Bytes> data, BulkImportOptions opts) {
data = partitionForAccumulo(data, fluoConfig.getAccumuloTable(), opts);
JavaPairRDD<Key, Value> kvData = data.flatMapToPair(tuple -> {
List<Tuple2<Key, Value>> output = new LinkedList<>();
RowColumn rc = tuple._1();
FluoKeyValueGenerator fkvg = new FluoKeyValueGenerator();
fkvg.setRow(rc.getRow()).setColumn(rc.getColumn()).setValue(tuple._2().toArray());
for (FluoKeyValue kv : fkvg.getKeyValues()) {
output.add(new Tuple2<>(kv.getKey(), kv.getValue()));
}
return output.iterator();
});
bulkImportKvToAccumulo(kvData, fluoConfig.getAccumuloTable(), opts);
}
/**
* Bulk import Key/Value data into into Fluo table (obtained from Fluo configuration). This method
* does not repartition data. One RFile will be created for each partition in the passed in RDD.
* Ensure the RDD is reasonably partitioned before calling this method.
*
* @param data Key/Value data to import
* @param opts Bulk import options
*/
public void bulkImportKvToFluo(JavaPairRDD<Key, Value> data, BulkImportOptions opts) {
bulkImportKvToAccumulo(data, fluoConfig.getAccumuloTable(), opts);
}
/**
* Bulk import RowColumn/Value data into specified Accumulo table. This method will repartition
* RDD using the current split points of the specified table, creating one partition per tablet in
* the table. This is done so that one RFile is created per tablet for bulk import.
*
* @param data RowColumn/Value data to import
* @param accumuloTable Accumulo table used for import
* @param opts Bulk import options
*/
public void bulkImportRcvToAccumulo(JavaPairRDD<RowColumn, Bytes> data, String accumuloTable,
BulkImportOptions opts) {
data = partitionForAccumulo(data, accumuloTable, opts);
JavaPairRDD<Key, Value> kvData = data.mapToPair(tuple -> {
RowColumn rc = tuple._1();
byte[] row = rc.getRow().toArray();
byte[] cf = rc.getColumn().getFamily().toArray();
byte[] cq = rc.getColumn().getQualifier().toArray();
byte[] val = tuple._2().toArray();
return new Tuple2<>(new Key(new Text(row), new Text(cf), new Text(cq), 0), new Value(val));
});
bulkImportKvToAccumulo(kvData, accumuloTable, opts);
}
/**
* Bulk import Key/Value data into specified Accumulo table. This method does not repartition
* data. One RFile will be created for each partition in the passed in RDD. Ensure the RDD is
* reasonably partitioned before calling this method.
*
* @param data Key/value data to import
* @param accumuloTable Accumulo table used for import
* @param opts Bulk import options
*/
public void bulkImportKvToAccumulo(JavaPairRDD<Key, Value> data, String accumuloTable,
BulkImportOptions opts) {
Path tempDir = getTempDir(opts);
try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
if (hdfs.exists(tempDir)) {
throw new IllegalArgumentException("HDFS temp dir already exists: " + tempDir.toString());
}
hdfs.mkdirs(tempDir);
Path dataDir = new Path(tempDir.toString() + "/data");
Path failDir = new Path(tempDir.toString() + "/fail");
hdfs.mkdirs(failDir);
// save data to HDFS
Job job = Job.getInstance(hadoopConfig);
AccumuloFileOutputFormat.setOutputPath(job, dataDir);
// must use new API here as saveAsHadoopFile throws exception
data.saveAsNewAPIHadoopFile(dataDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, job.getConfiguration());
// bulk import data to Accumulo
log.info("Wrote data for bulk import to HDFS temp directory: {}", dataDir);
Connector conn = chooseConnector(client, opts);
conn.tableOperations().importDirectory(accumuloTable, dataDir.toString(), failDir.toString(),
false);
// throw exception if failures directory contains files
if (hdfs.listFiles(failDir, true).hasNext()) {
throw new IllegalStateException("Bulk import failed! Found files that failed to import "
+ "in failures directory: " + failDir);
}
log.info("Successfully bulk imported data in {} to '{}' Accumulo table", dataDir,
accumuloTable);
// delete data directory
hdfs.delete(tempDir, true);
log.info("Deleted HDFS temp directory created for bulk import: {}", tempDir);
// @formatter:off
} catch (IOException | TableNotFoundException | AccumuloException
| AccumuloSecurityException e) {
// @formatter:on
throw new IllegalStateException(e);
}
}
/**
* Optional settings for Bulk Imports
*
* @since 1.0.0
*/
public static class BulkImportOptions {
public static BulkImportOptions DEFAULT = new BulkImportOptions();
Connector conn = null;
Path tempDir = null;
public BulkImportOptions() {}
/**
* If this methods is not called, then a Connector will be created using properties in the
* FluoConfiguration supplied to
* {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
*
* @param conn Use this connector to bulk import files into Accumulo.
* @return this
* @deprecated use {@link #setAccumuloClient(AccumuloClient)}
*/
@Deprecated(since = "1.3.0", forRemoval = true)
public BulkImportOptions setAccumuloConnector(Connector conn) {
Objects.requireNonNull(conn);
this.conn = conn;
return this;
}
/**
* If this methods is not called, then a Client will be created using properties in the
* FluoConfiguration supplied to
* {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
*
* @param conn Use this connector to bulk import files into Accumulo.
* @return this
*
* @since 1.3.0
*/
public BulkImportOptions setAccumuloClient(AccumuloClient client) {
Objects.requireNonNull(client);
try {
this.conn = Connector.from(client);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
return this;
}
/**
* If this method is not called, then a temp dir will be created based on the path passed
* supplied to {@link FluoSparkHelper#FluoSparkHelper(FluoConfiguration, Configuration, Path)}
*
* @param tempDir Use this directory to store RFiles generated for bulk import.
* @return this
*/
public BulkImportOptions setTempDir(Path tempDir) {
Objects.requireNonNull(tempDir);
this.tempDir = tempDir;
return this;
}
}
private Path getPossibleTempDir() {
return new Path(tempBaseDir.toString() + "/" + tempDirCounter.getAndIncrement());
}
private Path getTempDir(BulkImportOptions opts) {
Path tempDir;
if (opts.tempDir == null) {
try {
tempDir = getPossibleTempDir();
while (hdfs.exists(tempDir)) {
tempDir = getPossibleTempDir();
}
} catch (IOException e) {
throw new IllegalStateException(e);
}
} else {
tempDir = opts.tempDir;
}
return tempDir;
}
private JavaPairRDD<RowColumn, Bytes> partitionForAccumulo(JavaPairRDD<RowColumn, Bytes> data,
String accumuloTable, BulkImportOptions opts) {
// partition and sort data so that one file is created per an accumulo tablet
Partitioner accumuloPartitioner;
try (AccumuloClient client = getAccumuloClient(fluoConfig)) {
accumuloPartitioner = new AccumuloRangePartitioner(
chooseConnector(client, opts).tableOperations().listSplits(accumuloTable));
} catch (TableNotFoundException | AccumuloSecurityException | AccumuloException e) {
throw new IllegalStateException(e);
}
return data.repartitionAndSortWithinPartitions(accumuloPartitioner);
}
private Connector chooseConnector(AccumuloClient client, BulkImportOptions opts) {
try {
return opts.conn == null ? Connector.from(client) : opts.conn;
} catch (AccumuloSecurityException | AccumuloException e) {
throw new RuntimeException(e);
}
}
}