blob: df7bf1b5b04e15b5959c92f080e2ecbedee3156c [file] [log] [blame]
package org.apache.blur.manager.writer;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Timer;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.blur.BlurConfiguration;
import org.apache.blur.concurrent.Executors;
import org.apache.blur.lucene.search.IndexSearcherCloseable;
import org.apache.blur.server.ShardContext;
import org.apache.blur.server.TableContext;
import org.apache.blur.store.hdfs.BlurLockFactory;
import org.apache.blur.store.hdfs.HdfsDirectory;
import org.apache.blur.thrift.generated.Column;
import org.apache.blur.thrift.generated.Record;
import org.apache.blur.thrift.generated.RecordMutation;
import org.apache.blur.thrift.generated.RecordMutationType;
import org.apache.blur.thrift.generated.Row;
import org.apache.blur.thrift.generated.RowMutation;
import org.apache.blur.thrift.generated.RowMutationType;
import org.apache.blur.thrift.generated.TableDescriptor;
import org.apache.blur.trace.BaseTraceStorage;
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.TraceCollector;
import org.apache.blur.trace.TraceStorage;
import org.apache.blur.utils.BlurConstants;
import org.apache.blur.utils.BlurUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.index.IndexReader;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class BlurIndexSimpleWriterTest {
private static final int TOTAL_ROWS_FOR_TESTS = 10000;
private static final String TEST_TABLE = "test-table";
private static final int TEST_NUMBER_WAIT_VISIBLE = 100;
private static final int TEST_NUMBER = 50000;
private static final File TMPDIR = new File("./target/tmp");
private BlurIndexSimpleWriter _writer;
private Random random = new Random();
private ExecutorService _service;
private File _base;
private Configuration _configuration;
private SharedMergeScheduler _mergeScheduler;
private String uuid;
private BlurIndexCloser _closer;
private Timer _indexImporterTimer;
private Timer _bulkTimer;
private Timer _idleWriterTimer;
@Before
public void setup() throws IOException {
_indexImporterTimer = new Timer("Index Importer", true);
_bulkTimer = new Timer("Bulk Indexing", true);
_idleWriterTimer = new Timer("Idle Writer", true);
TableContext.clear();
_base = new File(TMPDIR, "blur-index-writer-test");
rmr(_base);
_base.mkdirs();
_mergeScheduler = new SharedMergeScheduler(1);
_configuration = new Configuration();
_service = Executors.newThreadPool("test", 10);
_closer = new BlurIndexCloser();
}
private void setupWriter(Configuration configuration) throws IOException {
setupWriter(configuration, false);
}
private void setupWriter(Configuration configuration, boolean reload) throws IOException {
TableDescriptor tableDescriptor = new TableDescriptor();
tableDescriptor.setName(TEST_TABLE);
if (!reload && uuid == null) {
uuid = UUID.randomUUID().toString();
}
tableDescriptor.setTableUri(new File(_base, "table-store-" + uuid).toURI().toString());
Map<String, String> tableProperties = new HashMap<String, String>();
tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_PAUSE_TIME_WHEN_EMPTY, "500");
tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_QUEUE_BATCH_SIZE, "500");
tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_WRITER_LOCK_TIME, "1000");
tableProperties.put(BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH, "1000");
tableDescriptor.setTableProperties(tableProperties);
TableContext tableContext = TableContext.create(tableDescriptor);
File path = new File(_base, "index_" + uuid);
path.mkdirs();
Path hdfsPath = new Path(path.toURI());
HdfsDirectory directory = new HdfsDirectory(_configuration, hdfsPath);
BlurLockFactory lockFactory = new BlurLockFactory(_configuration, hdfsPath, "unit-test", BlurUtil.getPid());
directory.setLockFactory(lockFactory);
ShardContext shardContext = ShardContext.create(tableContext, "test-shard-" + uuid);
_writer = new BlurIndexSimpleWriter(new BlurIndexConfig(shardContext, directory, _mergeScheduler, _service, _closer,
_indexImporterTimer, _bulkTimer, null, _idleWriterTimer, TimeUnit.SECONDS.toMillis(5)));
}
@After
public void tearDown() throws IOException {
_indexImporterTimer.cancel();
_indexImporterTimer.purge();
_bulkTimer.cancel();
_bulkTimer.purge();
_idleWriterTimer.cancel();
_idleWriterTimer.purge();
_writer.close();
_mergeScheduler.close();
_service.shutdownNow();
rmr(_base);
}
private void rmr(File file) {
if (!file.exists()) {
return;
}
if (file.isDirectory()) {
for (File f : file.listFiles()) {
rmr(f);
}
}
file.delete();
}
@Test
public void testRollbackAndReopen() throws IOException {
setupWriter(_configuration);
{
IndexSearcherCloseable searcher = _writer.getIndexSearcher();
IndexReader reader = searcher.getIndexReader();
assertEquals(0, reader.numDocs());
searcher.close();
}
MutatableAction action = new MutatableAction(_writer.getShardContext());
action.replaceRow(new Row());
try {
_writer.process(action);
fail("should throw exception");
} catch (IOException e) {
// do nothing
}
{
IndexSearcherCloseable searcher = _writer.getIndexSearcher();
IndexReader reader = searcher.getIndexReader();
assertEquals(0, reader.numDocs());
searcher.close();
}
action.replaceRow(genRow());
_writer.process(action);
{
IndexSearcherCloseable searcher = _writer.getIndexSearcher();
IndexReader reader = searcher.getIndexReader();
assertEquals(1, reader.numDocs());
searcher.close();
}
}
@Test
public void testBlurIndexWriter() throws IOException {
setupWriter(_configuration);
long s = System.nanoTime();
int total = 0;
TraceStorage oldStorage = Trace.getStorage();
Trace.setStorage(new BaseTraceStorage(new BlurConfiguration()) {
@Override
public void close() throws IOException {
}
@Override
public void store(TraceCollector collector) {
// try {
// System.out.println(collector.toJsonObject().toString(1));
// } catch (JSONException e) {
// e.printStackTrace();
// }
}
});
Trace.setupTrace("test");
for (int i = 0; i < TEST_NUMBER_WAIT_VISIBLE; i++) {
MutatableAction action = new MutatableAction(_writer.getShardContext());
action.replaceRow(genRow());
_writer.process(action);
IndexSearcherCloseable searcher = _writer.getIndexSearcher();
IndexReader reader = searcher.getIndexReader();
assertEquals(i + 1, reader.numDocs());
searcher.close();
total++;
int readersToBeClosedCount = _closer.getReadersToBeClosedCount();
int readerGenerationCount = _writer.getReaderGenerationCount();
assertTrue((readerGenerationCount - readersToBeClosedCount) < 3);
}
Trace.tearDownTrace();
long e = System.nanoTime();
double seconds = (e - s) / 1000000000.0;
double rate = total / seconds;
System.out.println("Rate " + rate);
IndexSearcherCloseable searcher = _writer.getIndexSearcher();
IndexReader reader = searcher.getIndexReader();
assertEquals(TEST_NUMBER_WAIT_VISIBLE, reader.numDocs());
searcher.close();
Trace.setStorage(oldStorage);
}
@Test
public void testBlurIndexWriterFaster() throws IOException, InterruptedException {
setupWriter(_configuration);
IndexSearcherCloseable searcher1 = _writer.getIndexSearcher();
IndexReader reader1 = searcher1.getIndexReader();
assertEquals(0, reader1.numDocs());
searcher1.close();
long s = System.nanoTime();
int total = 0;
MutatableAction action = new MutatableAction(_writer.getShardContext());
for (int i = 0; i < TEST_NUMBER; i++) {
action.replaceRow(genRow());
total++;
}
_writer.process(action);
long e = System.nanoTime();
double seconds = (e - s) / 1000000000.0;
double rate = total / seconds;
System.out.println("Rate " + rate);
// //wait one second for the data to become visible the test is set to
// refresh once every 25 ms
Thread.sleep(1000);// Hack for now
IndexSearcherCloseable searcher2 = _writer.getIndexSearcher();
IndexReader reader2 = searcher2.getIndexReader();
assertEquals(TEST_NUMBER, reader2.numDocs());
searcher2.close();
}
private Row genRow() {
Row row = new Row();
row.setId(Long.toString(random.nextLong()));
Record record = new Record();
record.setFamily("testing");
record.setRecordId(Long.toString(random.nextLong()));
for (int i = 0; i < 10; i++) {
record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
}
row.addToRecords(record);
return row;
}
@Test
public void testCreateSnapshot() throws IOException {
setupWriter(_configuration);
_writer.createSnapshot("test_snapshot");
assertTrue(_writer.getSnapshots().contains("test_snapshot"));
// check that the file is persisted
Path snapshotsDirPath = _writer.getSnapshotsDirectoryPath();
FileSystem fileSystem = snapshotsDirPath.getFileSystem(_configuration);
assertTrue(fileSystem.exists(new Path(snapshotsDirPath, "000000000001")));
// create a new writer instance and test whether the snapshots are loaded
// properly
_writer.close();
setupWriter(_configuration, true);
assertTrue(_writer.getSnapshots().contains("test_snapshot"));
}
@Test
public void testRemoveSnapshots() throws IOException {
setupWriter(_configuration);
Path snapshotsDirPath = _writer.getSnapshotsDirectoryPath();
FileSystem fileSystem = snapshotsDirPath.getFileSystem(new Configuration());
fileSystem.mkdirs(snapshotsDirPath);
_writer.createSnapshot("test_snapshot1");
_writer.createSnapshot("test_snapshot2");
// re-load the writer to load the snpshots
_writer.close();
setupWriter(_configuration, true);
assertEquals(2, _writer.getSnapshots().size());
_writer.removeSnapshot("test_snapshot2");
assertEquals(1, _writer.getSnapshots().size());
assertTrue(!_writer.getSnapshots().contains("test_snapshot2"));
}
@Test
public void testEnqueue() throws IOException, InterruptedException {
setupWriter(_configuration);
runQueueTest(TOTAL_ROWS_FOR_TESTS, TOTAL_ROWS_FOR_TESTS);
runQueueTest(TOTAL_ROWS_FOR_TESTS, TOTAL_ROWS_FOR_TESTS * 2);
runQueueTest(TOTAL_ROWS_FOR_TESTS, TOTAL_ROWS_FOR_TESTS * 3);
}
@Test
public void testAutoCloseOfWriter() throws InterruptedException, IOException {
setupWriter(_configuration);
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
for (int i = 0; i < 10; i++) {
if (_writer.isWriterClosed()) {
return;
}
Thread.sleep(TimeUnit.SECONDS.toMillis(1));
}
fail();
}
private void runQueueTest(final int mutatesToAdd, int numberOfValidDocs) throws IOException, InterruptedException {
final String table = _writer.getShardContext().getTableContext().getTable();
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < mutatesToAdd; i++) {
try {
_writer.enqueue(Arrays.asList(genRowMutation(table)));
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
thread.start();
long start = System.currentTimeMillis();
while (true) {
if (_writer.getIndexSearcher().getIndexReader().numDocs() == numberOfValidDocs) {
long end = System.currentTimeMillis();
System.out.println("[" + TOTAL_ROWS_FOR_TESTS + "] Mutations in [" + (end - start) + " ms]");
break;
}
Thread.sleep(100);
}
thread.join();
}
private RowMutation genRowMutation(String table) {
RowMutation rowMutation = new RowMutation();
rowMutation.setRowId(Long.toString(random.nextLong()));
rowMutation.setTable(table);
rowMutation.setRowMutationType(RowMutationType.REPLACE_ROW);
Record record = new Record();
record.setFamily("testing");
record.setRecordId(Long.toString(random.nextLong()));
for (int i = 0; i < 10; i++) {
record.addToColumns(new Column("col" + i, Long.toString(random.nextLong())));
}
rowMutation.addToRecordMutations(new RecordMutation(RecordMutationType.REPLACE_ENTIRE_RECORD, record));
return rowMutation;
}
}