blob: 1f2588b374405050c0e567160c7808c5cee1108b [file] [log] [blame]
/*
* Copyright 2015 Fluo authors (see AUTHORS)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package io.fluo.webindex.data.spark;
import java.io.File;
import java.io.IOException;
import com.google.common.base.Preconditions;
import io.fluo.api.config.FluoConfiguration;
import io.fluo.api.data.Bytes;
import io.fluo.api.data.RowColumn;
import io.fluo.core.util.AccumuloUtil;
import io.fluo.core.util.SpanUtil;
import io.fluo.webindex.core.DataConfig;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.TableExistsException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.mapreduce.AccumuloFileOutputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
public class IndexEnv {
private static final Logger log = LoggerFactory.getLogger(IndexEnv.class);
private JavaSparkContext sparkCtx;
private DataConfig dataConfig;
private Connector conn;
private FluoConfiguration fluoConfig;
private FileSystem hdfs;
private Path failuresDir;
private Path hadoopTempDir;
private Path accumuloTempDir;
private Path fluoTempDir;
public IndexEnv(DataConfig dataConfig, SparkConf sparkConf) throws IOException {
this.dataConfig = dataConfig;
Preconditions.checkNotNull(dataConfig.getFluoPropsPath());
Preconditions.checkArgument(new File(dataConfig.getFluoPropsPath()).exists(),
"fluoPropsPath must be set in data.yml and exist");
fluoConfig = new FluoConfiguration(new File(dataConfig.getFluoPropsPath()));
conn = AccumuloUtil.getConnector(fluoConfig);
sparkCtx = new JavaSparkContext(sparkConf);
hdfs = FileSystem.get(sparkCtx.hadoopConfiguration());
hadoopTempDir = new Path(dataConfig.hdfsTempDir + "/hadoop");
fluoTempDir = new Path(dataConfig.hdfsTempDir + "/fluo");
failuresDir = new Path(dataConfig.hdfsTempDir + "/failures");
accumuloTempDir = new Path(dataConfig.hdfsTempDir + "/accumulo");
}
public void initAccumuloIndexTable() throws AccumuloSecurityException, AccumuloException,
TableNotFoundException, TableExistsException {
if (conn.tableOperations().exists(dataConfig.accumuloIndexTable)) {
conn.tableOperations().delete(dataConfig.accumuloIndexTable);
}
conn.tableOperations().create(dataConfig.accumuloIndexTable);
}
public void makeHdfsTempDirs() throws IOException {
Path tempDir = new Path(dataConfig.hdfsTempDir);
if (!hdfs.exists(tempDir)) {
hdfs.mkdirs(tempDir);
}
if (!hdfs.exists(failuresDir)) {
hdfs.mkdirs(failuresDir);
}
}
public void saveToFluo(JavaPairRDD<Key, Value> data) throws Exception {
Job job = Job.getInstance(sparkCtx.hadoopConfiguration());
if (hdfs.exists(fluoTempDir)) {
hdfs.delete(fluoTempDir, true);
}
AccumuloFileOutputFormat.setOutputPath(job, fluoTempDir);
// must use new API here as saveAsHadoopFile throws exception
data.saveAsNewAPIHadoopFile(fluoTempDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, job.getConfiguration());
conn.tableOperations().importDirectory(fluoConfig.getAccumuloTable(), fluoTempDir.toString(),
failuresDir.toString(), false);
log.info("Imported data at {} into Fluo app {}", fluoTempDir, fluoConfig.getApplicationName());
}
public void saveRowColBytesToAccumulo(JavaPairRDD<RowColumn, Bytes> rowColBytes) throws Exception {
JavaPairRDD<Key, Value> kvData =
rowColBytes.mapToPair(t -> new Tuple2<Key, Value>(SpanUtil.toKey(t._1()), new Value(t._2()
.toArray())));
saveKeyValueToAccumulo(kvData);
}
public void saveKeyValueToAccumulo(JavaPairRDD<Key, Value> data) throws Exception {
Job accJob = Job.getInstance(sparkCtx.hadoopConfiguration());
if (hdfs.exists(accumuloTempDir)) {
hdfs.delete(accumuloTempDir, true);
}
AccumuloFileOutputFormat.setOutputPath(accJob, accumuloTempDir);
// must use new API here as saveAsHadoopFile throws exception
data.saveAsNewAPIHadoopFile(accumuloTempDir.toString(), Key.class, Value.class,
AccumuloFileOutputFormat.class, accJob.getConfiguration());
conn.tableOperations().importDirectory(dataConfig.accumuloIndexTable,
accumuloTempDir.toString(), failuresDir.toString(), false);
}
public FileSystem getHdfs() {
return hdfs;
}
public Path getFluoTempDir() {
return fluoTempDir;
}
public Path getAccumuloTempDir() {
return accumuloTempDir;
}
public Path getHadoopTempDir() {
return hadoopTempDir;
}
public Path getFailuresDir() {
return failuresDir;
}
public JavaSparkContext getSparkCtx() {
return sparkCtx;
}
public Connector getAccumuloConnector() {
return conn;
}
public FluoConfiguration getFluoConfig() {
return fluoConfig;
}
}