blob: f46b18416866ae66aa60367c315557a6e286c783 [file] [log] [blame]
package org.apache.blur.manager.writer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.apache.blur.lucene.LuceneVersionConstant.LUCENE_VERSION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.blur.analysis.FieldManager;
import org.apache.blur.lucene.search.IndexSearcherCloseable;
import org.apache.blur.lucene.search.IndexSearcherCloseableBase;
import org.apache.blur.server.ShardContext;
import org.apache.blur.server.TableContext;
import org.apache.blur.store.buffer.BufferStore;
import org.apache.blur.store.hdfs.HdfsDirectory;
import org.apache.blur.thrift.generated.Column;
import org.apache.blur.thrift.generated.Record;
import org.apache.blur.thrift.generated.RowMutation;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.utils.BlurConstants;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class IndexImporterTest {
private static final Path TMPDIR = new Path("target/tmp");
private Path _base;
private Configuration _configuration;
private IndexWriter _commitWriter;
private IndexImporter _indexImporter;
private Random _random = new Random();
private Path _path;
private Path _badRowIdsPath;
private IndexWriter _mainWriter;
private FileSystem _fileSystem;
private FieldManager _fieldManager;
private Path _badIndexPath;
private Path _inUsePath;
private Path _shardPath;
private HdfsDirectory _mainDirectory;
private Timer _timer;
@Before
public void setup() throws IOException {
_timer = new Timer("Index Importer", true);
TableContext.clear();
_configuration = new Configuration();
_base = new Path(TMPDIR, "blur-index-importer-test");
_fileSystem = _base.getFileSystem(_configuration);
_fileSystem.delete(_base, true);
_fileSystem.mkdirs(_base);
setupWriter(_configuration);
}
private void setupWriter(Configuration configuration) throws IOException {
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setName("test-table");
String uuid = UUID.randomUUID().toString();
tableDescriptor.setTableUri(new Path(_base, "table-table").toUri().toString());
tableDescriptor.setShardCount(2);
TableContext tableContext = TableContext.create(tableDescriptor);
ShardContext shardContext = ShardContext.create(tableContext, "shard-00000000");
Path tablePath = new Path(_base, "table-table");
_shardPath = new Path(tablePath, "shard-00000000");
String indexDirName = "index_" + uuid;
_path = new Path(_shardPath, indexDirName + ".commit");
_fileSystem.mkdirs(_path);
_badRowIdsPath = new Path(_shardPath, indexDirName + ".badrowids");
_badIndexPath = new Path(_shardPath, indexDirName + ".badindex");
_inUsePath = new Path(_shardPath, indexDirName + ".inuse");
Directory commitDirectory = new HdfsDirectory(configuration, _path);
_mainDirectory = new HdfsDirectory(configuration, _shardPath);
_fieldManager = tableContext.getFieldManager();
Analyzer analyzerForIndex = _fieldManager.getAnalyzerForIndex();
IndexWriterConfig conf = new IndexWriterConfig(LUCENE_VERSION, analyzerForIndex);
// conf.setMergePolicy(NoMergePolicy.NO_COMPOUND_FILES);
TieredMergePolicy mergePolicy = (TieredMergePolicy) conf.getMergePolicy();
mergePolicy.setUseCompoundFile(false);
_commitWriter = new IndexWriter(commitDirectory, conf.clone());
// Make sure there's an empty index...
new IndexWriter(_mainDirectory, conf.clone()).close();
_mainWriter = new IndexWriter(_mainDirectory, conf.clone());
BufferStore.initNewBuffer(128, 128 * 128);
_indexImporter = new IndexImporter(_timer, getBlurIndex(shardContext, _mainDirectory), shardContext,
TimeUnit.MINUTES, 10, 10, null, _mainDirectory);
}
private BlurIndex getBlurIndex(ShardContext shardContext, final Directory mainDirectory) throws IOException {
return new BlurIndex(new BlurIndexConfig(shardContext, mainDirectory, null, null, null, null, null, null, null, 0)) {
@Override
public void removeSnapshot(String name) throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public void process(IndexAction indexAction) throws IOException {
final DirectoryReader reader = DirectoryReader.open(mainDirectory);
IndexSearcherCloseable searcherClosable = new IndexSearcherCloseableBase(reader, null) {
@Override
public Directory getDirectory() {
return mainDirectory;
}
@Override
public void close() throws IOException {
}
};
try {
indexAction.performMutate(searcherClosable, _mainWriter);
indexAction.doPreCommit(searcherClosable, _mainWriter);
_mainWriter.commit();
indexAction.doPostCommit(_mainWriter);
} catch (IOException e) {
indexAction.doPreRollback(_mainWriter);
_mainWriter.rollback();
indexAction.doPostRollback(_mainWriter);
}
}
@Override
public void optimize(int numberOfSegmentsPerShard) throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public AtomicBoolean isClosed() {
throw new RuntimeException("Not Implemented");
}
@Override
public List<String> getSnapshots() throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public IndexSearcherCloseable getIndexSearcher() throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public void createSnapshot(String name) throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public void close() throws IOException {
throw new RuntimeException("Not Implemented");
}
@Override
public void enqueue(List<RowMutation> mutations) {
throw new RuntimeException("Not Implemented");
}
@Override
public void finishBulkMutate(String bulkId, boolean apply, boolean blockUntilComplete) throws IOException {
throw new RuntimeException("Not implemented.");
}
@Override
public void addBulkMutate(String bulkId, RowMutation mutation) throws IOException {
throw new RuntimeException("Not implemented.");
}
@Override
public long getSegmentImportPendingCount() throws IOException {
throw new RuntimeException("Not implemented.");
}
@Override
public long getSegmentImportInProgressCount() throws IOException {
throw new RuntimeException("Not implemented.");
}
@Override
public long getOnDiskSize() throws IOException {
throw new RuntimeException("Not Implemented");
}
};
}
@After
public void tearDown() throws IOException {
_timer.cancel();
_timer.purge();
IOUtils.closeQuietly(_mainWriter);
IOUtils.closeQuietly(_indexImporter);
_base.getFileSystem(_configuration).delete(_base, true);
}
@Test
public void testIndexImporterWithBadIndex() throws IOException {
_commitWriter.close();
_fileSystem.delete(_path, true);
_fileSystem.mkdirs(_path);
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertFalse(_fileSystem.exists(_badRowIdsPath));
assertTrue(_fileSystem.exists(_badIndexPath));
validateIndex();
}
private void validateIndex() throws IOException {
HdfsDirectory dir = new HdfsDirectory(_configuration, _shardPath);
DirectoryReader.open(dir).close();
}
@Test
public void testIndexImporterCheckInuseDirsForCleanup() throws IOException {
FileSystem fileSystem = _shardPath.getFileSystem(_configuration);
List<Field> document1 = _fieldManager.getFields("10", genRecord("1"));
_mainWriter.addDocument(document1);
_mainWriter.commit();
List<Field> document2 = _fieldManager.getFields("1", genRecord("1"));
_commitWriter.addDocument(document2);
_commitWriter.commit();
_commitWriter.close();
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertFalse(_fileSystem.exists(_badRowIdsPath));
assertTrue(_fileSystem.exists(_inUsePath));
DirectoryReader reader = DirectoryReader.open(_mainDirectory);
while (reader.leaves().size() > 1) {
reader.close();
_mainWriter.forceMerge(1, true);
_mainWriter.commit();
reader = DirectoryReader.open(_mainDirectory);
}
_indexImporter.cleanupOldDirs();
assertFalse(fileSystem.exists(_path));
validateIndex();
}
@Test
public void testIndexImporterWithCorrectRowIdShardCombination() throws IOException {
List<Field> document = _fieldManager.getFields("1", genRecord("1"));
_commitWriter.addDocument(document);
_commitWriter.commit();
_commitWriter.close();
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertFalse(_fileSystem.exists(_badRowIdsPath));
assertTrue(_fileSystem.exists(_inUsePath));
validateIndex();
}
@Test
public void testIndexImporterWithCorrectRowIdShardCombinationWithFastImport() throws IOException {
List<Field> document = _fieldManager.getFields("1", genRecord("1"));
document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
_commitWriter.addDocument(document);
_commitWriter.commit();
_commitWriter.close();
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertFalse(_fileSystem.exists(_badRowIdsPath));
assertTrue(_fileSystem.exists(_inUsePath));
validateIndex();
}
@Test
public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
List<Field> document = _fieldManager.getFields("2", genRecord("1"));
_commitWriter.addDocument(document);
_commitWriter.commit();
_commitWriter.close();
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertTrue(_fileSystem.exists(_badRowIdsPath));
assertFalse(_fileSystem.exists(_inUsePath));
validateIndex();
}
@Test
public void testIndexImporterWhenThereIsAFailureOnDuringImport() throws IOException {
List<Field> document = _fieldManager.getFields("1", genRecord("1"));
_commitWriter.addDocument(document);
_commitWriter.commit();
_commitWriter.close();
_indexImporter.setTestError(new Runnable() {
@Override
public void run() {
throw new RuntimeException("test");
}
});
try {
_indexImporter.run();
} catch (RuntimeException e) {
assertEquals("test", e.getMessage());
}
_indexImporter.cleanupOldDirs();
_indexImporter.setTestError(null);
_indexImporter.run();
assertFalse(_fileSystem.exists(_path));
assertFalse(_fileSystem.exists(_badRowIdsPath));
assertTrue(_fileSystem.exists(_inUsePath));
validateIndex();
}
private Record genRecord(String recordId) {
Record record = new Record();
record.setFamily("testing");
record.setRecordId(recordId);
for (int i = 0; i < 10; i++) {
record.addToColumns(new Column("col" + i, Long.toString(_random.nextLong())));
}
return record;
}
}