blob: f376274e2d1612ded7eeb6ecc0e531afeb6beaab [file] [log] [blame]
package org.apache.blur.mapreduce.lib.update;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.blur.index.AtomicReaderUtil;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.store.hdfs.DirectoryDecorator;
import org.apache.blur.store.hdfs.HdfsDirectory;
import org.apache.blur.utils.BlurConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HdfsBlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DeflateCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.SegmentInfoPerCommit;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
public class MergeSortRowIdMatcher {
private static final String DEL = ".del";
private static final Log LOG = LogFactory.getLog(MergeSortRowIdMatcher.class);
private static final Progressable NO_OP = new Progressable() {
@Override
public void progress() {
}
};
private static final long _10_SECONDS = TimeUnit.SECONDS.toNanos(10);
public interface Action {
void found(Text rowId) throws IOException;
}
private final MyReader[] _readers;
private final Configuration _configuration;
private final Path _cachePath;
private final IndexCommit _indexCommit;
private final Directory _directory;
private final Progressable _progressable;
private DirectoryReader _reader;
public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath)
throws IOException {
this(directory, generation, configuration, cachePath, null);
}
public MergeSortRowIdMatcher(Directory directory, long generation, Configuration configuration, Path cachePath,
Progressable progressable) throws IOException {
List<IndexCommit> listCommits = DirectoryReader.listCommits(directory);
_indexCommit = findIndexCommit(listCommits, generation);
_configuration = configuration;
_cachePath = cachePath;
_directory = directory;
_progressable = progressable == null ? NO_OP : progressable;
_readers = openReaders();
}
public void lookup(Text rowId, Action action) throws IOException {
if (lookup(rowId)) {
action.found(rowId);
}
}
private boolean lookup(Text rowId) throws IOException {
advanceReadersIfNeeded(rowId);
sortReaders();
return checkReaders(rowId);
}
private boolean checkReaders(Text rowId) {
for (MyReader reader : _readers) {
int compareTo = reader.getCurrentRowId().compareTo(rowId);
if (compareTo == 0) {
return true;
} else if (compareTo > 0) {
return false;
}
}
return false;
}
private void advanceReadersIfNeeded(Text rowId) throws IOException {
_progressable.progress();
for (MyReader reader : _readers) {
if (rowId.compareTo(reader.getCurrentRowId()) > 0) {
advanceReader(reader, rowId);
}
}
}
private void advanceReader(MyReader reader, Text rowId) throws IOException {
while (reader.next()) {
if (rowId.compareTo(reader.getCurrentRowId()) <= 0) {
return;
}
}
}
private static final Comparator<MyReader> COMP = new Comparator<MyReader>() {
@Override
public int compare(MyReader o1, MyReader o2) {
return o1.getCurrentRowId().compareTo(o2.getCurrentRowId());
}
};
private void sortReaders() {
Arrays.sort(_readers, COMP);
}
private MyReader[] openReaders() throws IOException {
Collection<SegmentKey> segmentKeys = getSegmentKeys();
MyReader[] readers = new MyReader[segmentKeys.size()];
int i = 0;
for (SegmentKey segmentKey : segmentKeys) {
readers[i++] = openReader(segmentKey);
}
return readers;
}
private MyReader openReader(SegmentKey segmentKey) throws IOException {
Path file = getCacheFilePath(segmentKey);
FileSystem fileSystem = _cachePath.getFileSystem(_configuration);
if (!fileSystem.exists(file)) {
createCacheFile(file, segmentKey);
}
Reader reader = new SequenceFile.Reader(_configuration, SequenceFile.Reader.file(file));
return new MyReader(reader);
}
private void createCacheFile(Path file, SegmentKey segmentKey) throws IOException {
LOG.info("Building cache for segment [{0}] to [{1}]", segmentKey, file);
Path tmpPath = getTmpWriterPath(file.getParent());
try (Writer writer = createWriter(_configuration, tmpPath)) {
DirectoryReader reader = getReader();
for (AtomicReaderContext context : reader.leaves()) {
SegmentReader segmentReader = AtomicReaderUtil.getSegmentReader(context.reader());
if (segmentReader.getSegmentName().equals(segmentKey.getSegmentName())) {
writeRowIds(writer, segmentReader);
break;
}
}
}
commitWriter(_configuration, file, tmpPath);
}
public static void commitWriter(Configuration configuration, Path file, Path tmpPath) throws IOException {
FileSystem fileSystem = tmpPath.getFileSystem(configuration);
LOG.info("Commit tmp [{0}] to file [{1}]", tmpPath, file);
if (!fileSystem.rename(tmpPath, file)) {
LOG.warn("Could not commit tmp file [{0}] to file [{1}]", tmpPath, file);
}
}
public static Path getTmpWriterPath(Path dir) {
return new Path(dir, UUID.randomUUID().toString() + ".tmp");
}
public static Writer createWriter(Configuration configuration, Path tmpPath) throws IOException {
return SequenceFile.createWriter(configuration, SequenceFile.Writer.file(tmpPath),
SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(NullWritable.class),
SequenceFile.Writer.compression(CompressionType.BLOCK, getCodec(configuration)));
}
private static CompressionCodec getCodec(Configuration configuration) {
if (ZlibFactory.isNativeZlibLoaded(configuration)) {
return new GzipCodec();
}
return new DeflateCodec();
}
private void writeRowIds(Writer writer, SegmentReader segmentReader) throws IOException {
Terms terms = segmentReader.terms(BlurConstants.ROW_ID);
if (terms == null) {
return;
}
TermsEnum termsEnum = terms.iterator(null);
BytesRef rowId;
long s = System.nanoTime();
while ((rowId = termsEnum.next()) != null) {
long n = System.nanoTime();
if (n + _10_SECONDS > s) {
_progressable.progress();
s = System.nanoTime();
}
writer.append(new Text(rowId.utf8ToString()), NullWritable.get());
}
}
private IndexCommit findIndexCommit(List<IndexCommit> listCommits, long generation) throws IOException {
for (IndexCommit commit : listCommits) {
if (commit.getGeneration() == generation) {
return commit;
}
}
throw new IOException("Generation [" + generation + "] not found.");
}
static class SegmentKey {
final String _segmentName;
final String _id;
SegmentKey(String segmentName, String id) throws IOException {
_segmentName = segmentName;
_id = id;
}
String getSegmentName() {
return _segmentName;
}
@Override
public String toString() {
return _id;
}
}
private DirectoryReader getReader() throws IOException {
if (_reader == null) {
_reader = DirectoryReader.open(_indexCommit);
}
return _reader;
}
private Collection<SegmentKey> getSegmentKeys() throws IOException {
List<SegmentKey> keys = new ArrayList<SegmentKey>();
SegmentInfos segmentInfos = new SegmentInfos();
segmentInfos.read(_directory, _indexCommit.getSegmentsFileName());
for (SegmentInfoPerCommit segmentInfoPerCommit : segmentInfos) {
String name = segmentInfoPerCommit.info.name;
String id = getId(segmentInfoPerCommit.info);
keys.add(new SegmentKey(name, id));
}
return keys;
}
private String getId(SegmentInfo si) throws IOException {
HdfsDirectory dir = getHdfsDirectory(si.dir);
Set<String> files = new TreeSet<String>(si.files());
return getId(_configuration, dir, files);
}
private static String getId(Configuration configuration, HdfsDirectory dir, Set<String> files) throws IOException {
long ts = 0;
String file = null;
for (String f : files) {
if (f.endsWith(DEL)) {
continue;
}
long fileModified = dir.getFileModified(f);
if (fileModified > ts) {
ts = fileModified;
file = f;
}
}
Path path = dir.getPath();
FileSystem fileSystem = path.getFileSystem(configuration);
Path realFile = new Path(path, file);
if (!fileSystem.exists(realFile)) {
realFile = dir.getRealFilePathFromSymlink(file);
if (!fileSystem.exists(realFile)) {
throw new IOException("Lucene file [" + file + "] for dir [" + path + "] can not be found.");
}
}
return getFirstBlockId(fileSystem, realFile);
}
public static String getIdForSingleSegmentIndex(Configuration configuration, Path indexPath) throws IOException {
HdfsDirectory dir = new HdfsDirectory(configuration, indexPath);
Set<String> files = new TreeSet<String>(Arrays.asList(dir.listAll()));
return getId(configuration, dir, files);
}
private static String getFirstBlockId(FileSystem fileSystem, Path realFile) throws IOException {
FileStatus fileStatus = fileSystem.getFileStatus(realFile);
BlockLocation[] locations = fileSystem.getFileBlockLocations(fileStatus, 0, 1);
HdfsBlockLocation location = (HdfsBlockLocation) locations[0];
LocatedBlock locatedBlock = location.getLocatedBlock();
ExtendedBlock block = locatedBlock.getBlock();
return toNiceString(block.getBlockId());
}
private static String toNiceString(long blockId) {
return "b" + blockId;
}
private static HdfsDirectory getHdfsDirectory(Directory dir) {
if (dir instanceof HdfsDirectory) {
return (HdfsDirectory) dir;
} else if (dir instanceof DirectoryDecorator) {
DirectoryDecorator dd = (DirectoryDecorator) dir;
return getHdfsDirectory(dd.getOriginalDirectory());
} else {
throw new RuntimeException("Unknown directory type.");
}
}
private Path getCacheFilePath(SegmentKey segmentKey) {
return new Path(_cachePath, segmentKey + ".seq");
}
static class MyReader {
final Reader _reader;
final Text _rowId = new Text();
boolean _finished = false;
public MyReader(Reader reader) {
_reader = reader;
}
public Text getCurrentRowId() {
return _rowId;
}
public boolean next() throws IOException {
if (_finished) {
return false;
}
if (_reader.next(_rowId)) {
return true;
}
_finished = true;
return false;
}
public boolean isFinished() {
return _finished;
}
}
public static Path getCachePath(Path cachePath, String table, String shardName) {
return new Path(new Path(cachePath, table), shardName);
}
}