blob: fba819edf84dcda87d21c9c10036cb702e71d87b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.blur.mapreduce.lib;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.analysis.FieldManager;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
import org.apache.blur.lucene.LuceneVersionConstant;
import org.apache.blur.lucene.codec.Blur024Codec;
import org.apache.blur.mapreduce.lib.BlurMutate.MUTATE_TYPE;
import org.apache.blur.server.TableContext;
import org.apache.blur.store.hdfs.HdfsDirectory;
import org.apache.blur.thrift.generated.Column;
import org.apache.blur.thrift.generated.Record;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.RowDocumentUtil;
import org.apache.blur.utils.ShardUtil;
import org.apache.blur.utils.ThreadValue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.util.Progressable;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.NoLockFactory;
import org.apache.lucene.store.SimpleFSDirectory;
public class GenericBlurRecordWriter {
private static final Log LOG = LogFactory.getLog(GenericBlurRecordWriter.class);
private static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
private static final Counter NULL_COUNTER = NullCounter.getNullCounter();
private final Text _prevKey = new Text();
private final IndexWriter _writer;
private final FieldManager _fieldManager;
private final Directory _finalDir;
private final Directory _localDir;
private final File _localPath;
// private final int _maxDocumentBufferSize;
private final IndexWriterConfig _conf;
private final IndexWriterConfig _overFlowConf;
private final Path _newIndex;
private final boolean _indexLocally;
private final boolean _optimizeInFlight;
private final DocumentBufferStrategy _documentBufferStrategy;
private Counter _columnCount = NULL_COUNTER;
private Counter _fieldCount = NULL_COUNTER;
private Counter _recordCount = NULL_COUNTER;
private Counter _rowCount = NULL_COUNTER;
private Counter _recordDuplicateCount = NULL_COUNTER;
private Counter _rowOverFlowCount = NULL_COUNTER;
private Counter _rowDeleteCount = NULL_COUNTER;
private RateCounter _recordRateCounter = new RateCounter(NULL_COUNTER);
private RateCounter _rowRateCounter = new RateCounter(NULL_COUNTER);
private RateCounter _copyRateCounter = new RateCounter(NULL_COUNTER);
private boolean _countersSetup = false;
private IndexWriter _localTmpWriter;
private boolean _usingLocalTmpindex;
private File _localTmpPath;
private ProgressableDirectory _localTmpDir;
private String _deletedRowId;
private Configuration _configuration;
private String _currentRowId;
public GenericBlurRecordWriter(Configuration configuration, int attemptId, String tmpDirName) throws IOException {
_configuration = configuration;
_documentBufferStrategy = BlurOutputFormat.getDocumentBufferStrategy(_configuration);
_indexLocally = BlurOutputFormat.isIndexLocally(_configuration);
_optimizeInFlight = BlurOutputFormat.isOptimizeInFlight(_configuration);
TableDescriptor tableDescriptor = BlurOutputFormat.getTableDescriptor(_configuration);
int shardCount = tableDescriptor.getShardCount();
int shardId = attemptId % shardCount;
Path tableOutput = BlurOutputFormat.getOutputPath(_configuration);
String shardName = ShardUtil.getShardName(BlurConstants.SHARD_PREFIX, shardId);
Path indexPath = new Path(tableOutput, shardName);
_newIndex = new Path(indexPath, tmpDirName);
_finalDir = new ProgressableDirectory(new HdfsDirectory(_configuration, _newIndex), getProgressable());
_finalDir.setLockFactory(NoLockFactory.getNoLockFactory());
TableContext tableContext = TableContext.create(tableDescriptor);
_fieldManager = tableContext.getFieldManager();
Analyzer analyzer = _fieldManager.getAnalyzerForIndex();
_conf = new IndexWriterConfig(LuceneVersionConstant.LUCENE_VERSION, analyzer);
_conf.setCodec(new Blur024Codec());
_conf.setSimilarity(tableContext.getSimilarity());
TieredMergePolicy mergePolicy = (TieredMergePolicy) _conf.getMergePolicy();
mergePolicy.setUseCompoundFile(false);
_overFlowConf = _conf.clone();
if (_indexLocally) {
String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
_localPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
SimpleFSDirectory directory = new SimpleFSDirectory(_localPath);
_localDir = new ProgressableDirectory(directory, getProgressable());
_writer = new IndexWriter(_localDir, _conf.clone());
} else {
_localPath = null;
_localDir = null;
_writer = new IndexWriter(_finalDir, _conf.clone());
}
}
private Progressable getProgressable() {
final Progressable prg = BlurOutputFormat.getProgressable();
return new Progressable() {
private Progressable _progressable = prg;
private long _lastWarn = 0;
private boolean _progressSetupLogged = false;
@Override
public void progress() {
if (_progressable != null) {
_progressable.progress();
if (!_progressSetupLogged) {
LOG.info("Progress has been setup correctly.");
_progressSetupLogged = true;
}
} else {
Progressable progressable = BlurOutputFormat.getProgressable();
if (progressable != null) {
_progressable = progressable;
} else {
long now = System.nanoTime();
if (_lastWarn + TimeUnit.SECONDS.toNanos(10) < now) {
LOG.warn("Progress not being reported.");
_lastWarn = System.nanoTime();
}
}
}
}
};
}
public void write(Text key, BlurMutate value) throws IOException {
if (!_countersSetup) {
setupCounter();
_countersSetup = true;
}
if (!_prevKey.equals(key)) {
flush();
_prevKey.set(key);
}
add(value);
}
private void setupCounter() {
GetCounter getCounter = BlurOutputFormat.getGetCounter();
if (getCounter != null) {
_fieldCount = getCounter.getCounter(BlurCounters.LUCENE_FIELD_COUNT);
_columnCount = getCounter.getCounter(BlurCounters.COLUMN_COUNT);
_recordCount = getCounter.getCounter(BlurCounters.RECORD_COUNT);
_recordDuplicateCount = getCounter.getCounter(BlurCounters.RECORD_DUPLICATE_COUNT);
_rowCount = getCounter.getCounter(BlurCounters.ROW_COUNT);
_rowDeleteCount = getCounter.getCounter(BlurCounters.ROW_DELETE_COUNT);
_rowOverFlowCount = getCounter.getCounter(BlurCounters.ROW_OVERFLOW_COUNT);
_recordRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.RECORD_RATE));
_rowRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.ROW_RATE));
_copyRateCounter = new RateCounter(getCounter.getCounter(BlurCounters.COPY_RATE));
}
}
private void add(BlurMutate value) throws IOException {
BlurRecord blurRecord = value.getRecord();
_currentRowId = blurRecord.getRowId();
Record record = getRecord(blurRecord);
String recordId = record.getRecordId();
if (value.getMutateType() == MUTATE_TYPE.DELETE) {
_deletedRowId = blurRecord.getRowId();
return;
}
if (_countersSetup) {
_columnCount.increment(record.getColumns().size());
}
List<Field> document = RowDocumentUtil.getDoc(_fieldManager, blurRecord.getRowId(), record);
List<Field> dup = _documentBufferStrategy.add(recordId, document);
if (_countersSetup) {
if (dup != null) {
_recordDuplicateCount.increment(1);
} else {
_fieldCount.increment(document.size());
_recordCount.increment(1);
}
}
flushToTmpIndexIfNeeded();
}
private void flushToTmpIndexIfNeeded() throws IOException {
if (_documentBufferStrategy.isFull()) {
LOG.info("RowId [" + _currentRowId + "] - Document Buffer is full overflow to disk.");
flushToTmpIndex();
}
}
private void flushToTmpIndex() throws IOException {
if (_documentBufferStrategy.isEmpty()) {
return;
}
_usingLocalTmpindex = true;
if (_localTmpWriter == null) {
String localDirPath = System.getProperty(JAVA_IO_TMPDIR);
_localTmpPath = new File(localDirPath, UUID.randomUUID().toString() + ".tmp");
SimpleFSDirectory directory = new SimpleFSDirectory(_localTmpPath);
_localTmpDir = new ProgressableDirectory(directory, getProgressable());
_localTmpWriter = new IndexWriter(_localTmpDir, _overFlowConf.clone());
// The local tmp writer has merging disabled so the first document in is
// going to be doc 0.
// Therefore the first document added is the prime doc
List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
for (List<Field> doc : docs) {
_localTmpWriter.addDocument(doc);
}
} else {
List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
for (List<Field> doc : docs) {
_localTmpWriter.addDocument(doc);
}
}
}
private void resetLocalTmp() {
_usingLocalTmpindex = false;
_localTmpWriter = null;
_localTmpDir = null;
rm(_localTmpPath);
_localTmpPath = null;
}
public static Record getRecord(BlurRecord value) {
Record record = new Record();
record.setRecordId(value.getRecordId());
record.setFamily(value.getFamily());
for (BlurColumn col : value.getColumns()) {
record.addToColumns(new Column(col.getName(), col.getValue()));
}
return record;
}
private static ThreadValue<AtomicBoolean> _existingRow = new ThreadValue<AtomicBoolean>() {
@Override
protected AtomicBoolean initialValue() {
return new AtomicBoolean();
}
};
public static boolean isCurrentRowExisting() {
return _existingRow.get().get();
}
public static void setCurrentRowExistingRowId(boolean existing) {
_existingRow.get().set(existing);
}
private void flush() throws CorruptIndexException, IOException {
boolean newRow = !isCurrentRowExisting();
if (_usingLocalTmpindex) {
// since we have flushed to disk then we do not need to index the
// delete.
flushToTmpIndex();
LOG.info("RowId [" + _currentRowId + "] - forceMerge");
_localTmpWriter.forceMerge(1, true);
_localTmpWriter.close(true);
DirectoryReader reader = DirectoryReader.open(_localTmpDir);
AtomicReader atomicReader = getAtomicReader(reader);
LOG.info("RowId [" + _currentRowId + "] - total documents [" + atomicReader.maxDoc() + "]");
AtomicReader primeDocAtomicReader = PrimeDocOverFlowHelper.addPrimeDoc(atomicReader, newRow, _currentRowId);
if (_countersSetup) {
_recordRateCounter.mark(reader.numDocs());
}
_writer.addIndexes(primeDocAtomicReader);
primeDocAtomicReader.close();
resetLocalTmp();
_writer.maybeMerge();
LOG.info("RowId [" + _currentRowId + "] - add complete");
if (_countersSetup) {
_rowOverFlowCount.increment(1);
}
} else {
if (_documentBufferStrategy.isEmpty()) {
if (_deletedRowId != null) {
_writer.addDocument(getDeleteDoc());
if (_countersSetup) {
_rowDeleteCount.increment(1);
}
}
} else {
List<List<Field>> docs = _documentBufferStrategy.getAndClearBuffer();
docs.get(0).add(new StringField(BlurConstants.PRIME_DOC, BlurConstants.PRIME_DOC_VALUE, Store.NO));
if (newRow) {
docs.get(0).add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
} else {
docs.get(0).add(new StringField(BlurConstants.UPDATE_ROW, _currentRowId, Store.NO));
}
_writer.addDocuments(docs);
if (_countersSetup) {
_recordRateCounter.mark(docs.size());
}
}
}
_deletedRowId = null;
if (_countersSetup) {
_rowRateCounter.mark();
_rowCount.increment(1);
}
}
private AtomicReader getAtomicReader(DirectoryReader reader) throws IOException {
List<AtomicReaderContext> leaves = reader.leaves();
if (leaves.size() == 1) {
return leaves.get(0).reader();
}
throw new IOException("Reader [" + reader + "] has more than one segment after optimize.");
}
private Document getDeleteDoc() {
Document document = new Document();
document.add(new StringField(BlurConstants.ROW_ID, _deletedRowId, Store.NO));
document.add(new StringField(BlurConstants.DELETE_MARKER, BlurConstants.DELETE_MARKER_VALUE, Store.NO));
document.add(new StringField(BlurConstants.UPDATE_ROW, _deletedRowId, Store.NO));
return document;
}
public void close() throws IOException {
flush();
_writer.close(false);
if (_countersSetup) {
_recordRateCounter.close();
_rowRateCounter.close();
}
if (_indexLocally) {
if (_optimizeInFlight) {
copyAndOptimizeInFlightDir();
} else {
copyDir();
}
}
if (_countersSetup) {
_copyRateCounter.close();
}
}
private void copyAndOptimizeInFlightDir() throws IOException {
CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
copyRateDirectory.setLockFactory(NoLockFactory.getNoLockFactory());
DirectoryReader reader = DirectoryReader.open(_localDir);
IndexWriter writer = new IndexWriter(copyRateDirectory, _conf.clone());
writer.addIndexes(reader);
writer.setCommitData(getInternalMarker());
writer.close();
rm(_localPath);
}
private Map<String, String> getInternalMarker() {
Map<String, String> map = new HashMap<String, String>();
map.put(BlurConstants.INTERNAL, BlurConstants.INTERNAL);
return map;
}
private void copyDir() throws IOException {
CopyRateDirectory copyRateDirectory = new CopyRateDirectory(_finalDir, _copyRateCounter);
String[] fileNames = _localDir.listAll();
for (String fileName : fileNames) {
LOG.info("Copying [{0}] to [{1}]", fileName, _newIndex);
_localDir.copy(copyRateDirectory, fileName, fileName, IOContext.DEFAULT);
}
rm(_localPath);
}
private void rm(File file) {
if (!file.exists()) {
return;
}
if (file.isDirectory()) {
for (File f : file.listFiles()) {
rm(f);
}
}
file.delete();
}
}