blob: 1983caeadfb3120a521333669d329f4966f87629 [file] [log] [blame]
package org.apache.blur.mapreduce.lib.update;
import java.io.IOException;
import java.util.List;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.manager.BlurPartitioner;
import org.apache.blur.manager.writer.SnapshotIndexDeletionPolicy;
import org.apache.blur.mapreduce.lib.BlurInputFormat;
import org.apache.blur.mapreduce.lib.BlurOutputFormat;
import org.apache.blur.mapreduce.lib.update.MergeSortRowIdMatcher.Action;
import org.apache.blur.store.BlockCacheDirectoryFactoryV2;
import org.apache.blur.store.hdfs.HdfsDirectory;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.ShardUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.Terms;
import org.apache.lucene.store.Directory;
import com.google.common.io.Closer;
public class LookupBuilderReducer extends Reducer<Text, NullWritable, Text, BooleanWritable> {
public static final String BLUR_CACHE_DIR_TOTAL_BYTES = "blur.cache.dir.total.bytes";
private Counter _rowIds;
private Counter _rowIdsToUpdate;
private MergeSortRowIdMatcher _matcher;
private int _numberOfShardsInTable;
private Configuration _configuration;
private String _snapshot;
private Path _tablePath;
private Counter _rowIdsFromIndex;
private long _totalNumberOfBytes;
private Action _action;
private Closer _closer;
private Path _cachePath;
private String _table;
private Writer _writer;
@Override
protected void setup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
InterruptedException {
_configuration = context.getConfiguration();
_rowIds = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_NEW_DATA);
_rowIdsToUpdate = context.getCounter(BlurIndexCounter.ROW_IDS_TO_UPDATE_FROM_NEW_DATA);
_rowIdsFromIndex = context.getCounter(BlurIndexCounter.ROW_IDS_FROM_INDEX);
TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
_numberOfShardsInTable = tableDescriptor.getShardCount();
_tablePath = new Path(tableDescriptor.getTableUri());
_snapshot = MapperForExistingDataWithIndexLookup.getSnapshot(_configuration);
_totalNumberOfBytes = _configuration.getLong(BLUR_CACHE_DIR_TOTAL_BYTES, 128 * 1024 * 1024);
_cachePath = BlurInputFormat.getLocalCachePath(_configuration);
_table = tableDescriptor.getName();
_closer = Closer.create();
}
@Override
protected void reduce(Text rowId, Iterable<NullWritable> nothing,
Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException, InterruptedException {
if (_matcher == null) {
_matcher = getMergeSortRowIdMatcher(rowId, context);
}
if (_writer == null) {
_writer = getRowIdWriter(rowId, context);
}
_writer.append(rowId, NullWritable.get());
_rowIds.increment(1);
if (_action == null) {
_action = new Action() {
@Override
public void found(Text rowId) throws IOException {
_rowIdsToUpdate.increment(1);
try {
context.write(rowId, new BooleanWritable(true));
} catch (InterruptedException e) {
throw new IOException(e);
}
}
};
}
_matcher.lookup(rowId, _action);
}
private Writer getRowIdWriter(Text rowId, Reducer<Text, NullWritable, Text, BooleanWritable>.Context context)
throws IOException {
BlurPartitioner blurPartitioner = new BlurPartitioner();
int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
String shardName = ShardUtil.getShardName(shard);
Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
Configuration configuration = context.getConfiguration();
String uuid = configuration.get(FasterDriver.BLUR_UPDATE_ID);
Path tmpPath = new Path(cachePath, uuid + "_" + getAttemptString(context));
return _closer.register(MergeSortRowIdMatcher.createWriter(_configuration, tmpPath));
}
private String getAttemptString(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) {
TaskAttemptID taskAttemptID = context.getTaskAttemptID();
return taskAttemptID.toString();
}
@Override
protected void cleanup(Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException,
InterruptedException {
_closer.close();
}
private MergeSortRowIdMatcher getMergeSortRowIdMatcher(Text rowId,
Reducer<Text, NullWritable, Text, BooleanWritable>.Context context) throws IOException {
BlurPartitioner blurPartitioner = new BlurPartitioner();
int shard = blurPartitioner.getShard(rowId, _numberOfShardsInTable);
String shardName = ShardUtil.getShardName(shard);
Path shardPath = new Path(_tablePath, shardName);
HdfsDirectory hdfsDirectory = new HdfsDirectory(_configuration, shardPath);
SnapshotIndexDeletionPolicy policy = new SnapshotIndexDeletionPolicy(_configuration,
SnapshotIndexDeletionPolicy.getGenerationsPath(shardPath));
Long generation = policy.getGeneration(_snapshot);
if (generation == null) {
hdfsDirectory.close();
throw new IOException("Snapshot [" + _snapshot + "] not found in shard [" + shardPath + "]");
}
BlurConfiguration bc = new BlurConfiguration();
BlockCacheDirectoryFactoryV2 blockCacheDirectoryFactoryV2 = new BlockCacheDirectoryFactoryV2(bc,
_totalNumberOfBytes);
_closer.register(blockCacheDirectoryFactoryV2);
Directory dir = blockCacheDirectoryFactoryV2.newDirectory("table", "shard", hdfsDirectory, null);
List<IndexCommit> listCommits = DirectoryReader.listCommits(dir);
IndexCommit indexCommit = MapperForExistingDataWithIndexLookup.findIndexCommit(listCommits, generation, shardPath);
DirectoryReader reader = DirectoryReader.open(indexCommit);
_rowIdsFromIndex.setValue(getTotalNumberOfRowIds(reader));
Path cachePath = MergeSortRowIdMatcher.getCachePath(_cachePath, _table, shardName);
return new MergeSortRowIdMatcher(dir, generation, _configuration, cachePath, context);
}
private long getTotalNumberOfRowIds(DirectoryReader reader) throws IOException {
long total = 0;
List<AtomicReaderContext> leaves = reader.leaves();
for (AtomicReaderContext context : leaves) {
AtomicReader atomicReader = context.reader();
Terms terms = atomicReader.terms(BlurConstants.ROW_ID);
long expectedInsertions = terms.size();
if (expectedInsertions < 0) {
return -1;
}
total += expectedInsertions;
}
return total;
}
}