blob: f43cba5dfd75ec822177e540e6a83fb8f2ce06fc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.mapreduce.lib.update;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.mapreduce.lib.BlurInputFormat;
import org.apache.blur.mapreduce.lib.BlurOutputFormat;
import org.apache.blur.thirdparty.thrift_0_9_0.TException;
import org.apache.blur.thrift.BlurClient;
import org.apache.blur.thrift.generated.Blur.Iface;
import org.apache.blur.thrift.generated.BlurException;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.thrift.generated.TableStats;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class FasterDriver extends Configured implements Tool {
public static final String BLUR_UPDATE_ID = "blur.update.id";
private static final String BLUR_EXEC_TYPE = "blur.exec.type";
public static final String TMP = "tmp";
public enum EXEC {
MR_ONLY, MR_WITH_LOOKUP, AUTOMATIC
}
public static final String MRUPDATE_SNAPSHOT = "mrupdate-snapshot";
public static final String CACHE = "cache";
public static final String COMPLETE = "complete";
public static final String INPROGRESS = "inprogress";
public static final String NEW = "new";
private static final Log LOG = LogFactory.getLog(FasterDriver.class);
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FasterDriver(), args);
System.exit(res);
}
static class PartitionedInputResult {
final Path _partitionedInputData;
final Counters _counters;
final long[] _rowIdsFromNewData;
final long[] _rowIdsToUpdateFromNewData;
final long[] _rowIdsFromIndex;
PartitionedInputResult(Path partitionedInputData, Counters counters, int shards, TaskReport[] taskReports) {
_partitionedInputData = partitionedInputData;
_counters = counters;
_rowIdsFromNewData = new long[shards];
_rowIdsToUpdateFromNewData = new long[shards];
_rowIdsFromIndex = new long[shards];
for (TaskReport tr : taskReports) {
int id = tr.getTaskID().getId();
Counters taskCounters = tr.getTaskCounters();
Counter total = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
_rowIdsFromNewData[id] = total.getValue();
Counter update = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
_rowIdsToUpdateFromNewData[id] = update.getValue();
Counter index = taskCounters.findCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
_rowIdsFromIndex[id] = index.getValue();
}
}
}
@Override
public int run(String[] args) throws Exception {
int c = 0;
if (args.length < 5) {
System.err
.println("Usage Driver <table> <mr inc working path> <output path> <zk connection> <reducer multipler> <extra config files...>");
return 1;
}
String table = args[c++];
String mrIncWorkingPathStr = args[c++];
String outputPathStr = args[c++];
String blurZkConnection = args[c++];
int reducerMultipler = Integer.parseInt(args[c++]);
for (; c < args.length; c++) {
String externalConfigFileToAdd = args[c];
getConf().addResource(new Path(externalConfigFileToAdd));
}
Path outputPath = new Path(outputPathStr);
Path mrIncWorkingPath = new Path(mrIncWorkingPathStr);
FileSystem fileSystem = mrIncWorkingPath.getFileSystem(getConf());
Path newData = new Path(mrIncWorkingPath, NEW);
Path inprogressData = new Path(mrIncWorkingPath, INPROGRESS);
Path completeData = new Path(mrIncWorkingPath, COMPLETE);
Path fileCache = new Path(mrIncWorkingPath, CACHE);
Path tmpPathDontDelete = new Path(mrIncWorkingPath, TMP);
Path tmpPath = new Path(tmpPathDontDelete, UUID.randomUUID().toString());
fileSystem.mkdirs(newData);
fileSystem.mkdirs(inprogressData);
fileSystem.mkdirs(completeData);
fileSystem.mkdirs(fileCache);
List<Path> srcPathList = new ArrayList<Path>();
for (FileStatus fileStatus : fileSystem.listStatus(newData)) {
srcPathList.add(fileStatus.getPath());
}
if (srcPathList.isEmpty()) {
return 0;
}
List<Path> inprogressPathList = new ArrayList<Path>();
boolean success = false;
Iface client = null;
EXEC exec = EXEC.valueOf(getConf().get(BLUR_EXEC_TYPE, EXEC.AUTOMATIC.name()).toUpperCase());
String uuid = UUID.randomUUID().toString();
try {
client = BlurClient.getClientFromZooKeeperConnectionStr(blurZkConnection);
TableDescriptor descriptor = client.describe(table);
Map<String, String> tableProperties = descriptor.getTableProperties();
String fastDir = tableProperties.get("blur.table.disable.fast.dir");
if (fastDir == null || !fastDir.equals("true")) {
LOG.error("Table [{0}] has blur.table.disable.fast.dir enabled, not supported in fast MR update.", table);
return 1;
}
waitForOtherSnapshotsToBeRemoved(client, table, MRUPDATE_SNAPSHOT);
client.createSnapshot(table, MRUPDATE_SNAPSHOT);
TableStats tableStats = client.tableStats(table);
inprogressPathList = movePathList(fileSystem, inprogressData, srcPathList);
switch (exec) {
case MR_ONLY:
success = runMrOnly(descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler);
break;
case MR_WITH_LOOKUP:
success = runMrWithLookup(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
tmpPath, tableStats, MRUPDATE_SNAPSHOT);
break;
case AUTOMATIC:
success = runAutomatic(uuid, descriptor, inprogressPathList, table, fileCache, outputPath, reducerMultipler,
tmpPath, tableStats, MRUPDATE_SNAPSHOT);
break;
default:
throw new RuntimeException("Exec type [" + exec + "] not supported.");
}
} finally {
if (success) {
LOG.info("Associate lookup cache with new data!");
associateLookupCache(uuid, fileCache, outputPath);
LOG.info("Indexing job succeeded!");
client.loadData(table, outputPathStr);
LOG.info("Load data called");
movePathList(fileSystem, completeData, inprogressPathList);
LOG.info("Input data moved to complete");
ClusterDriver.waitForDataToLoad(client, table);
LOG.info("Data loaded");
} else {
LOG.error("Indexing job failed!");
movePathList(fileSystem, newData, inprogressPathList);
}
fileSystem.delete(tmpPath, true);
if (client != null) {
client.removeSnapshot(table, MRUPDATE_SNAPSHOT);
}
}
if (success) {
return 0;
} else {
return 1;
}
}
private void associateLookupCache(String uuid, Path fileCache, Path outputPath) throws IOException {
FileSystem fileSystem = fileCache.getFileSystem(getConf());
cleanupExtraFileFromSpecX(fileSystem, uuid, fileCache);
associateLookupCache(fileSystem, uuid, fileSystem.getFileStatus(fileCache), outputPath);
}
private void cleanupExtraFileFromSpecX(FileSystem fileSystem, String uuid, Path fileCache) throws IOException {
FileStatus[] listStatus = fileSystem.listStatus(fileCache);
List<FileStatus> uuidPaths = new ArrayList<FileStatus>();
for (FileStatus fs : listStatus) {
Path path = fs.getPath();
if (fs.isDirectory()) {
cleanupExtraFileFromSpecX(fileSystem, uuid, path);
} else if (path.getName().startsWith(uuid)) {
uuidPaths.add(fs);
}
}
if (uuidPaths.size() > 1) {
deleteIncomplete(fileSystem, uuidPaths);
}
}
private void deleteIncomplete(FileSystem fileSystem, List<FileStatus> uuidPaths) throws IOException {
long max = 0;
FileStatus keeper = null;
for (FileStatus fs : uuidPaths) {
long len = fs.getLen();
if (len > max) {
keeper = fs;
max = len;
}
}
for (FileStatus fs : uuidPaths) {
if (fs != keeper) {
LOG.info("Deleteing incomplete cache file [{0}]", fs.getPath());
fileSystem.delete(fs.getPath(), false);
}
}
}
private void associateLookupCache(FileSystem fileSystem, String uuid, FileStatus fileCache, Path outputPath)
throws IOException {
Path path = fileCache.getPath();
if (fileCache.isDirectory()) {
FileStatus[] listStatus = fileSystem.listStatus(path);
for (FileStatus fs : listStatus) {
associateLookupCache(fileSystem, uuid, fs, outputPath);
}
} else if (path.getName().startsWith(uuid)) {
Path parent = path.getParent();
String shardName = parent.getName();
Path indexPath = findOutputDirPath(outputPath, shardName);
LOG.info("Path found for shard [{0}] outputPath [{1}]", shardName, outputPath);
String id = MergeSortRowIdMatcher.getIdForSingleSegmentIndex(getConf(), indexPath);
Path file = new Path(path.getParent(), id + ".seq");
MergeSortRowIdMatcher.commitWriter(getConf(), file, path);
}
}
private Path findOutputDirPath(Path outputPath, String shardName) throws IOException {
FileSystem fileSystem = outputPath.getFileSystem(getConf());
Path shardPath = new Path(outputPath, shardName);
if (!fileSystem.exists(shardPath)) {
throw new IOException("Shard path [" + shardPath + "]");
}
FileStatus[] listStatus = fileSystem.listStatus(shardPath, new PathFilter() {
@Override
public boolean accept(Path path) {
return path.getName().endsWith(".commit");
}
});
if (listStatus.length == 1) {
FileStatus fs = listStatus[0];
return fs.getPath();
} else {
throw new IOException("More than one sub dir [" + shardPath + "]");
}
}
private boolean runAutomatic(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
throws ClassNotFoundException, IOException, InterruptedException {
PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
fileCache);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
InputSplitPruneUtil.setBlurLookupRowIdFromNewDataCounts(job, table, result._rowIdsFromNewData);
InputSplitPruneUtil.setBlurLookupRowIdUpdateFromNewDataCounts(job, table, result._rowIdsToUpdateFromNewData);
InputSplitPruneUtil.setBlurLookupRowIdFromIndexCounts(job, table, result._rowIdsFromIndex);
InputSplitPruneUtil.setTable(job, table);
BlurInputFormat.setLocalCachePath(job, fileCache);
// Existing data - This adds the copy data files first open and stream
// through all documents.
{
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, PrunedBlurInputFormat.class, MapperForExistingDataMod.class);
}
// Existing data - This adds the row id lookup
{
MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
FileInputFormat.addInputPath(job, result._partitionedInputData);
MultipleInputs.addInputPath(job, result._partitionedInputData, PrunedSequenceFileInputFormat.class,
MapperForExistingDataWithIndexLookup.class);
}
// New Data
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
private boolean runMrWithLookup(String uuid, TableDescriptor descriptor, List<Path> inprogressPathList, String table,
Path fileCache, Path outputPath, int reducerMultipler, Path tmpPath, TableStats tableStats, String snapshot)
throws ClassNotFoundException, IOException, InterruptedException {
PartitionedInputResult result = buildPartitionedInputData(uuid, tmpPath, descriptor, inprogressPathList, snapshot,
fileCache);
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
MapperForExistingDataWithIndexLookup.setSnapshot(job, MRUPDATE_SNAPSHOT);
FileInputFormat.addInputPath(job, result._partitionedInputData);
MultipleInputs.addInputPath(job, result._partitionedInputData, SequenceFileInputFormat.class,
MapperForExistingDataWithIndexLookup.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
private boolean runMrOnly(TableDescriptor descriptor, List<Path> inprogressPathList, String table, Path fileCache,
Path outputPath, int reducerMultipler) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(getConf(), "Blur Row Updater for table [" + table + "]");
Path tablePath = new Path(descriptor.getTableUri());
BlurInputFormat.setLocalCachePath(job, fileCache);
BlurInputFormat.addTable(job, descriptor, MRUPDATE_SNAPSHOT);
MultipleInputs.addInputPath(job, tablePath, BlurInputFormat.class, MapperForExistingDataMod.class);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
MultipleInputs.addInputPath(job, p, SequenceFileInputFormat.class, MapperForNewDataMod.class);
}
BlurOutputFormat.setOutputPath(job, outputPath);
BlurOutputFormat.setupJob(job, descriptor);
job.setReducerClass(UpdateReducer.class);
job.setMapOutputKeyClass(IndexKey.class);
job.setMapOutputValueClass(IndexValue.class);
job.setPartitionerClass(IndexKeyPartitioner.class);
job.setGroupingComparatorClass(IndexKeyWritableComparator.class);
BlurOutputFormat.setReducerMultiplier(job, reducerMultipler);
boolean success = job.waitForCompletion(true);
Counters counters = job.getCounters();
LOG.info("Counters [" + counters + "]");
return success;
}
private PartitionedInputResult buildPartitionedInputData(String uuid, Path tmpPath, TableDescriptor descriptor,
List<Path> inprogressPathList, String snapshot, Path fileCachePath) throws IOException, ClassNotFoundException,
InterruptedException {
Job job = Job.getInstance(getConf(), "Partitioning data for table [" + descriptor.getName() + "]");
job.getConfiguration().set(BLUR_UPDATE_ID, uuid);
// Needed for the bloom filter path information.
BlurOutputFormat.setTableDescriptor(job, descriptor);
BlurInputFormat.setLocalCachePath(job, fileCachePath);
MapperForExistingDataWithIndexLookup.setSnapshot(job, snapshot);
for (Path p : inprogressPathList) {
FileInputFormat.addInputPath(job, p);
}
Path outputPath = new Path(tmpPath, UUID.randomUUID().toString());
job.setJarByClass(getClass());
job.setMapperClass(LookupBuilderMapper.class);
job.setReducerClass(LookupBuilderReducer.class);
int shardCount = descriptor.getShardCount();
job.setNumReduceTasks(shardCount);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BooleanWritable.class);
FileOutputFormat.setOutputPath(job, outputPath);
if (job.waitForCompletion(true)) {
return new PartitionedInputResult(outputPath, job.getCounters(), shardCount, job.getTaskReports(TaskType.REDUCE));
} else {
throw new IOException("Partitioning failed!");
}
}
private void waitForOtherSnapshotsToBeRemoved(Iface client, String table, String snapshot) throws BlurException,
TException, InterruptedException {
while (true) {
Map<String, List<String>> listSnapshots = client.listSnapshots(table);
boolean mrupdateSnapshots = false;
for (Entry<String, List<String>> e : listSnapshots.entrySet()) {
List<String> value = e.getValue();
if (value.contains(snapshot)) {
mrupdateSnapshots = true;
}
}
if (!mrupdateSnapshots) {
return;
} else {
LOG.info(snapshot + " Snapshot for table [{0}] already exists", table);
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
LOG.info("Retrying");
}
}
}
private List<Path> movePathList(FileSystem fileSystem, Path dstDir, List<Path> lst) throws IOException {
List<Path> result = new ArrayList<Path>();
for (Path src : lst) {
Path dst = new Path(dstDir, src.getName());
if (fileSystem.rename(src, dst)) {
LOG.info("Moving [{0}] to [{1}]", src, dst);
result.add(dst);
} else {
LOG.error("Could not move [{0}] to [{1}]", src, dst);
}
}
return result;
}
}