Second patch of updates.
diff --git a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
index ad542ef..be92e34 100644
--- a/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
+++ b/blur-core/src/main/java/org/apache/blur/command/BaseCommandManager.java
@@ -134,19 +134,20 @@
}
public CommandStatus getCommandStatus(String commandExecutionId) {
- CommandStatus cso = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
- if (cso != null) {
- return cso;
- }
- return findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+ CommandStatus cso1 = findCommandStatusObject(commandExecutionId, _workerRunningMap.values());
+ CommandStatus cso2 = findCommandStatusObject(commandExecutionId, _driverRunningMap.values());
+ return CommandStatusUtil.mergeCommandStatus(cso1, cso2);
}
private CommandStatus findCommandStatusObject(String commandExecutionId, Collection<ResponseFuture<?>> values) {
Map<String, Map<CommandStatusState, Long>> serverStateMap = new HashMap<String, Map<CommandStatusState, Long>>();
CommandStatus commandStatus = null;
for (ResponseFuture<?> responseFuture : values) {
+ if (responseFuture == null) {
+ continue;
+ }
Command<?> commandExecuting = responseFuture.getCommandExecuting();
- if (commandExecuting.getCommandExecutionId().equals(commandExecutionId)) {
+ if (commandExecutionId.equals(commandExecuting.getCommandExecutionId())) {
if (commandStatus == null) {
CommandStatus originalCommandStatusObject = responseFuture.getOriginalCommandStatusObject();
String commandName = responseFuture.getCommandExecuting().getName();
@@ -182,7 +183,10 @@
List<String> result = new ArrayList<String>();
for (ResponseFuture<?> responseFuture : values) {
Command<?> commandExecuting = responseFuture.getCommandExecuting();
- result.add(commandExecuting.getCommandExecutionId());
+ String commandExecutionId = commandExecuting.getCommandExecutionId();
+ if (commandExecutionId != null) {
+ result.add(commandExecutionId);
+ }
}
return result;
}
@@ -400,11 +404,12 @@
}
protected Response submitDriverCallable(Callable<Response> callable, Command<?> commandExecuting,
- CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException, ExceptionCollector {
+ CommandStatus originalCommandStatusObject, AtomicBoolean running) throws IOException, TimeoutException,
+ ExceptionCollector {
Future<Response> future = _executorServiceDriver.submit(callable);
Long instanceExecutionId = getInstanceExecutionId();
_driverRunningMap.put(instanceExecutionId, new ResponseFuture<Response>(_runningCacheTombstoneTime, future,
- commandExecuting, originalCommandStatusObject,running));
+ commandExecuting, originalCommandStatusObject, running));
try {
return future.get(_connectionTimeout, TimeUnit.MILLISECONDS);
} catch (CancellationException e) {
diff --git a/blur-core/src/main/java/org/apache/blur/command/Command.java b/blur-core/src/main/java/org/apache/blur/command/Command.java
index 9cf2719..ff6f559 100644
--- a/blur-core/src/main/java/org/apache/blur/command/Command.java
+++ b/blur-core/src/main/java/org/apache/blur/command/Command.java
@@ -30,7 +30,7 @@
public abstract class Command<R> implements Cloneable {
- @OptionalArgument("The ")
+ @OptionalArgument
private String commandExecutionId;
public abstract String getName();
diff --git a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
index b7a1a63..59f5b7c 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ControllerClusterContext.java
@@ -23,6 +23,7 @@
import org.apache.blur.thrift.BlurClientManager;
import org.apache.blur.thrift.ClientPool;
import org.apache.blur.thrift.Connection;
+import org.apache.blur.thrift.UserConverter;
import org.apache.blur.thrift.generated.Arguments;
import org.apache.blur.thrift.generated.Blur.Client;
import org.apache.blur.thrift.generated.BlurException;
@@ -31,6 +32,7 @@
import org.apache.blur.thrift.generated.TimeoutException;
import org.apache.blur.thrift.generated.ValueObject;
import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -134,7 +136,7 @@
final Arguments arguments = _manager.toArguments(command);
- CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
@@ -226,7 +228,7 @@
Set<Shard> shards = command.routeShards(this, tables);
Map<Server, Client> clientMap = getClientMap(command, tables, shards);
final Arguments arguments = _manager.toArguments(command);
- CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, null);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, command.getName(), arguments, null, UserConverter.toThriftUser(UserContext.getUser()));
for (Entry<Server, Client> e : clientMap.entrySet()) {
Server server = e.getKey();
final Client client = e.getValue();
diff --git a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
index a5a629e..ef4a046 100644
--- a/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
+++ b/blur-core/src/main/java/org/apache/blur/command/ResponseFuture.java
@@ -16,6 +16,7 @@
*/
package org.apache.blur.command;
+import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -39,6 +40,9 @@
_tombstone = tombstone;
_future = future;
_commandExecuting = commandExecuting;
+ if (_commandExecuting.getCommandExecutionId() == null) {
+ _commandExecuting.setCommandExecutionId(UUID.randomUUID().toString());
+ }
_originalCommandStatusObject = originalCommandStatusObject;
_running = running;
}
diff --git a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
index 2f89c9e..4ac5631 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/stats/MergerTableStats.java
@@ -46,7 +46,7 @@
private TableStats merge(TableStats s1, TableStats s2) {
s1.tableName = s2.tableName;
- s1.bytes = Math.max(s1.bytes, s2.bytes);
+ s1.bytes = s1.bytes + s2.bytes;
s1.recordCount = s1.recordCount + s2.recordCount;
s1.rowCount = s1.rowCount + s2.rowCount;
s1.segmentImportInProgressCount = s1.segmentImportInProgressCount + s2.segmentImportInProgressCount;
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
index e21a952..ff17e27 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/BlurIndexSimpleWriter.java
@@ -25,7 +25,6 @@
import static org.apache.blur.utils.BlurConstants.BLUR_SHARD_QUEUE_MAX_INMEMORY_LENGTH;
import java.io.Closeable;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -77,6 +76,7 @@
import org.apache.blur.user.UserContext;
import org.apache.blur.utils.BlurConstants;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -428,31 +428,31 @@
}
private void closeWriter() {
- if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) {
- synchronized (_writer) {
- _writeLock.lock();
- try {
- BlurIndexWriter writer = _writer.getAndSet(null);
- if (writer != null) {
- LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(),
- _shardContext.getShard());
- IOUtils.cleanup(LOG, writer);
- }
- } finally {
- _writeLock.unlock();
+ _writeLock.lock();
+ try {
+ if (_lastWrite.get() + _maxWriterIdle < System.currentTimeMillis()) {
+ BlurIndexWriter writer = _writer.getAndSet(null);
+ if (writer != null) {
+ LOG.info("Closing idle writer for table [{0}] shard [{1}]", _tableContext.getTable(),
+ _shardContext.getShard());
+ IOUtils.cleanup(LOG, writer);
}
}
+ } finally {
+ _writeLock.unlock();
}
}
+ /**
+ * Testing only.
+ */
protected boolean isWriterClosed() {
- synchronized (_writer) {
- return _writer.get() == null;
- }
+ return _writer.get() == null;
}
private BlurIndexWriter getBlurIndexWriter() throws IOException {
- synchronized (_writer) {
+ _writeLock.lock();
+ try {
BlurIndexWriter blurIndexWriter = _writer.get();
if (blurIndexWriter == null) {
blurIndexWriter = new BlurIndexWriter(_directory, _conf.clone());
@@ -460,12 +460,17 @@
_lastWrite.set(System.currentTimeMillis());
}
return blurIndexWriter;
+ } finally {
+ _writeLock.unlock();
}
}
private void resetBlurIndexWriter() {
- synchronized (_writer) {
+ _writeLock.lock();
+ try {
_writer.set(null);
+ } finally {
+ _writeLock.unlock();
}
}
@@ -501,22 +506,12 @@
@Override
public void createSnapshot(String name) throws IOException {
- _writeLock.lock();
- try {
- _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context);
- } finally {
- _writeLock.unlock();
- }
+ _snapshotIndexDeletionPolicy.createSnapshot(name, _indexReader.get(), _context);
}
@Override
public void removeSnapshot(String name) throws IOException {
- _writeLock.lock();
- try {
- _snapshotIndexDeletionPolicy.removeSnapshot(name, _context);
- } finally {
- _writeLock.unlock();
- }
+ _snapshotIndexDeletionPolicy.removeSnapshot(name, _context);
}
@Override
@@ -1024,17 +1019,10 @@
@Override
public long getOnDiskSize() throws IOException {
- long total = 0;
- String[] listAll = _directory.listAll();
- for (String name : listAll) {
- try {
- total += _directory.fileLength(name);
- } catch (FileNotFoundException e) {
- // If file is not found that means that is was removed between the time
- // we started iterating over the file names and when we asked for it's
- // size.
- }
- }
- return total;
+ Path hdfsDirPath = _shardContext.getHdfsDirPath();
+ Configuration configuration = _tableContext.getConfiguration();
+ FileSystem fileSystem = hdfsDirPath.getFileSystem(configuration);
+ ContentSummary contentSummary = fileSystem.getContentSummary(hdfsDirPath);
+ return contentSummary.getLength();
}
}
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
index 42171e8..33db0ae 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/IndexImporter.java
@@ -36,6 +36,7 @@
import org.apache.blur.log.LogFactory;
import org.apache.blur.lucene.search.IndexSearcherCloseable;
import org.apache.blur.manager.BlurPartitioner;
+import org.apache.blur.manager.writer.MergeSortRowIdLookup.Action;
import org.apache.blur.server.ShardContext;
import org.apache.blur.server.TableContext;
import org.apache.blur.server.cache.ThriftCache;
@@ -54,12 +55,16 @@
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.CompositeReaderContext;
import org.apache.lucene.index.DirectoryReader;
+import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.Fields;
+import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
+import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
public class IndexImporter extends TimerTask implements Closeable {
@@ -292,7 +297,9 @@
public void performMutate(IndexSearcherCloseable searcher, IndexWriter writer) throws IOException {
LOG.info("About to import [{0}] into [{1}/{2}]", directory, _shard, _table);
boolean emitDeletes = searcher.getIndexReader().numDocs() != 0;
- applyDeletes(directory, writer, _shard, emitDeletes);
+ Configuration configuration = _shardContext.getTableContext().getConfiguration();
+
+ applyDeletes(directory, writer, searcher, _shard, emitDeletes, configuration);
LOG.info("Add index [{0}] [{1}/{2}]", directory, _shard, _table);
writer.addIndexes(directory);
LOG.info("Removing delete markers [{0}] on [{1}/{2}]", directory, _shard, _table);
@@ -336,42 +343,115 @@
return result;
}
- private void applyDeletes(Directory directory, IndexWriter indexWriter, String shard, boolean emitDeletes)
- throws IOException {
- DirectoryReader reader = DirectoryReader.open(directory);
+ private void applyDeletes(Directory directory, IndexWriter indexWriter, IndexSearcherCloseable searcher,
+ String shard, boolean emitDeletes, Configuration configuration) throws IOException {
+ DirectoryReader newReader = DirectoryReader.open(directory);
try {
- LOG.info("Applying deletes in reader [{0}]", reader);
- CompositeReaderContext compositeReaderContext = reader.getContext();
- List<AtomicReaderContext> leaves = compositeReaderContext.leaves();
+ List<AtomicReaderContext> newLeaves = newReader.getContext().leaves();
BlurPartitioner blurPartitioner = new BlurPartitioner();
Text key = new Text();
int numberOfShards = _shardContext.getTableContext().getDescriptor().getShardCount();
int shardId = ShardUtil.getShardIndex(shard);
- for (AtomicReaderContext context : leaves) {
- AtomicReader atomicReader = context.reader();
- Fields fields = atomicReader.fields();
- Terms terms = fields.terms(BlurConstants.ROW_ID);
- if (terms != null) {
- TermsEnum termsEnum = terms.iterator(null);
- BytesRef ref = null;
- while ((ref = termsEnum.next()) != null) {
- key.set(ref.bytes, ref.offset, ref.length);
- int partition = blurPartitioner.getPartition(key, null, numberOfShards);
- if (shardId != partition) {
- throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
- + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
- }
- if (emitDeletes) {
- indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref)));
- }
+
+ Action action = new Action() {
+ @Override
+ public void found(AtomicReader reader, Bits liveDocs, TermsEnum termsEnum) throws IOException {
+ DocsEnum docsEnum = termsEnum.docs(liveDocs, null);
+ if (docsEnum.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) {
+ indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(termsEnum.term())));
}
}
+ };
+
+ LOG.info("Applying deletes for table [{0}] shard [{1}] new reader [{2}]", _table, shard, newReader);
+ boolean skipCheckRowIds = isInternal(newReader);
+ LOG.info("Skip rowid check [{0}] for table [{1}] shard [{2}] new reader [{3}]", skipCheckRowIds, _table, shard,
+ newReader);
+ for (AtomicReaderContext context : newLeaves) {
+ AtomicReader newAtomicReader = context.reader();
+ if (isFastRowIdDeleteSupported(newAtomicReader)) {
+ runNewRowIdCheckAndDelete(indexWriter, emitDeletes, blurPartitioner, key, numberOfShards, shardId,
+ newAtomicReader, skipCheckRowIds);
+ } else {
+ runOldMergeSortRowIdCheckAndDelete(emitDeletes, searcher.getIndexReader(), blurPartitioner, key,
+ numberOfShards, shardId, action, newAtomicReader);
+ }
}
} finally {
- reader.close();
+ newReader.close();
}
}
+ private boolean isInternal(DirectoryReader reader) throws IOException {
+ Map<String, String> map = reader.getIndexCommit().getUserData();
+ return BlurConstants.INTERNAL.equals(map.get(BlurConstants.INTERNAL));
+ }
+
+ private void runNewRowIdCheckAndDelete(IndexWriter indexWriter, boolean emitDeletes, BlurPartitioner blurPartitioner,
+ Text key, int numberOfShards, int shardId, AtomicReader atomicReader, boolean skipCheckRowIds) throws IOException {
+ Fields fields = atomicReader.fields();
+ if (skipCheckRowIds) {
+ Terms rowIdTerms = fields.terms(BlurConstants.ROW_ID);
+ if (rowIdTerms != null) {
+ LOG.info("Checking rowIds for import on table [{0}] shard [{1}]", _table, _shard);
+ TermsEnum rowIdTermsEnum = rowIdTerms.iterator(null);
+ BytesRef ref = null;
+ while ((ref = rowIdTermsEnum.next()) != null) {
+ key.set(ref.bytes, ref.offset, ref.length);
+ int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+ if (shardId != partition) {
+ throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
+ + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
+ }
+ }
+ }
+ }
+ if (emitDeletes) {
+ Terms rowIdsToDeleteTerms = fields.terms(BlurConstants.UPDATE_ROW);
+ if (rowIdsToDeleteTerms != null) {
+ LOG.info("Performing deletes on rowIds for import on table [{0}] shard [{1}]", _table, _shard);
+ TermsEnum rowIdsToDeleteTermsEnum = rowIdsToDeleteTerms.iterator(null);
+ BytesRef ref = null;
+ while ((ref = rowIdsToDeleteTermsEnum.next()) != null) {
+ indexWriter.deleteDocuments(new Term(BlurConstants.ROW_ID, BytesRef.deepCopyOf(ref)));
+ }
+ }
+ }
+ }
+
+ private void runOldMergeSortRowIdCheckAndDelete(boolean emitDeletes, IndexReader currentIndexReader,
+ BlurPartitioner blurPartitioner, Text key, int numberOfShards, int shardId, Action action,
+ AtomicReader atomicReader) throws IOException {
+ MergeSortRowIdLookup lookup = new MergeSortRowIdLookup(currentIndexReader);
+ Fields fields = atomicReader.fields();
+ Terms terms = fields.terms(BlurConstants.ROW_ID);
+ if (terms != null) {
+ TermsEnum termsEnum = terms.iterator(null);
+ BytesRef ref = null;
+ while ((ref = termsEnum.next()) != null) {
+ key.set(ref.bytes, ref.offset, ref.length);
+ int partition = blurPartitioner.getPartition(key, null, numberOfShards);
+ if (shardId != partition) {
+ throw new IOException("Index is corrupted, RowIds are found in wrong shard, partition [" + partition
+ + "] does not shard [" + shardId + "], this can happen when rows are not hashed correctly.");
+ }
+ if (emitDeletes) {
+ lookup.lookup(ref, action);
+ }
+ }
+ }
+ }
+
+ private boolean isFastRowIdDeleteSupported(AtomicReader atomicReader) throws IOException {
+ if (atomicReader.fields().terms(BlurConstants.NEW_ROW) != null) {
+ return true;
+ }
+ if (atomicReader.fields().terms(BlurConstants.UPDATE_ROW) != null) {
+ return true;
+ }
+ return false;
+ }
+
public void cleanupOldDirs() throws IOException {
Path hdfsDirPath = _shardContext.getHdfsDirPath();
TableContext tableContext = _shardContext.getTableContext();
diff --git a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
index 30690e5..15d9272 100644
--- a/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
+++ b/blur-core/src/main/java/org/apache/blur/manager/writer/SnapshotIndexDeletionPolicy.java
@@ -28,6 +28,8 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.blur.log.Log;
import org.apache.blur.log.LogFactory;
@@ -54,6 +56,7 @@
private final Path _path;
private final Map<String, Long> _namesToGenerations = new ConcurrentHashMap<String, Long>();
private final Map<Long, Set<String>> _generationsToNames = new ConcurrentHashMap<Long, Set<String>>();
+ private final WriteLock _writeLock = new ReentrantReadWriteLock().writeLock();
public SnapshotIndexDeletionPolicy(Configuration configuration, Path path) throws IOException {
_configuration = configuration;
@@ -70,13 +73,18 @@
@Override
public void onCommit(List<? extends IndexCommit> commits) throws IOException {
- int size = commits.size();
- for (int i = 0; i < size - 1; i++) {
- IndexCommit indexCommit = commits.get(i);
- long generation = indexCommit.getGeneration();
- if (!_generationsToNames.containsKey(generation)) {
- indexCommit.delete();
+ _writeLock.lock();
+ try {
+ int size = commits.size();
+ for (int i = 0; i < size - 1; i++) {
+ IndexCommit indexCommit = commits.get(i);
+ long generation = indexCommit.getGeneration();
+ if (!_generationsToNames.containsKey(generation)) {
+ indexCommit.delete();
+ }
}
+ } finally {
+ _writeLock.unlock();
}
}
@@ -147,36 +155,46 @@
}
public void createSnapshot(String name, DirectoryReader reader, String context) throws IOException {
- if (_namesToGenerations.containsKey(name)) {
- throw new IOException("Snapshot [" + name + "] already exists.");
+ _writeLock.lock();
+ try {
+ if (_namesToGenerations.containsKey(name)) {
+ throw new IOException("Snapshot [" + name + "] already exists.");
+ }
+ LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
+ IndexCommit indexCommit = reader.getIndexCommit();
+ long generation = indexCommit.getGeneration();
+ _namesToGenerations.put(name, generation);
+ Set<String> names = _generationsToNames.get(generation);
+ if (names == null) {
+ names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ _generationsToNames.put(generation, names);
+ }
+ names.add(name);
+ storeGenerations();
+ } finally {
+ _writeLock.unlock();
}
- LOG.info("Creating snapshot [{0}] in [{1}].", name, context);
- IndexCommit indexCommit = reader.getIndexCommit();
- long generation = indexCommit.getGeneration();
- _namesToGenerations.put(name, generation);
- Set<String> names = _generationsToNames.get(generation);
- if (names == null) {
- names = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
- _generationsToNames.put(generation, names);
- }
- names.add(name);
- storeGenerations();
}
public void removeSnapshot(String name, String context) throws IOException {
- Long gen = _namesToGenerations.get(name);
- if (gen == null) {
- LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
- return;
+ _writeLock.lock();
+ try {
+ Long gen = _namesToGenerations.get(name);
+ if (gen == null) {
+ LOG.info("Snapshot [{0}] does not exist in [{1}].", name, context);
+ return;
+ }
+ LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
+ _namesToGenerations.remove(name);
+ Set<String> names = _generationsToNames.get(gen);
+ names.remove(name);
+ if (names.isEmpty()) {
+ _generationsToNames.remove(gen);
+ }
+ storeGenerations();
+ } finally {
+ _writeLock.unlock();
}
- LOG.info("Removing snapshot [{0}] from [{1}].", name, context);
- _namesToGenerations.remove(name);
- Set<String> names = _generationsToNames.get(gen);
- names.remove(name);
- if (names.isEmpty()) {
- _generationsToNames.remove(gen);
- }
- storeGenerations();
}
public Collection<String> getSnapshots() {
@@ -194,5 +212,4 @@
public static Path getGenerationsPath(Path shardDir) {
return new Path(shardDir, "generations");
}
-
}
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
index 477c923..e4d29e0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurControllerServer.java
@@ -51,6 +51,7 @@
import org.apache.blur.command.ArgumentOverlay;
import org.apache.blur.command.BlurObject;
import org.apache.blur.command.BlurObjectSerDe;
+import org.apache.blur.command.CommandStatusUtil;
import org.apache.blur.command.CommandUtil;
import org.apache.blur.command.ControllerCommandManager;
import org.apache.blur.command.Response;
@@ -92,7 +93,6 @@
import org.apache.blur.thrift.generated.ColumnDefinition;
import org.apache.blur.thrift.generated.CommandDescriptor;
import org.apache.blur.thrift.generated.CommandStatus;
-import org.apache.blur.thrift.generated.CommandStatusState;
import org.apache.blur.thrift.generated.ErrorType;
import org.apache.blur.thrift.generated.FetchResult;
import org.apache.blur.thrift.generated.HighlightOptions;
@@ -108,6 +108,7 @@
import org.apache.blur.trace.Trace;
import org.apache.blur.trace.Trace.TraceId;
import org.apache.blur.trace.Tracer;
+import org.apache.blur.user.UserContext;
import org.apache.blur.utils.BlurExecutorCompletionService;
import org.apache.blur.utils.BlurIterator;
import org.apache.blur.utils.BlurUtil;
@@ -1514,7 +1515,8 @@
throws BlurException, TException {
try {
BlurObject args = CommandUtil.toBlurObject(arguments);
- CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null, null);
+ CommandStatus originalCommandStatusObject = new CommandStatus(null, commandName, arguments, null,
+ UserConverter.toThriftUser(UserContext.getUser()));
Response response = _commandManager.execute(getTableContextFactory(), getLayoutFactory(), commandName,
new ArgumentOverlay(args, _serDe), originalCommandStatusObject);
return CommandUtil.fromObjectToThrift(response, _serDe);
@@ -1861,7 +1863,7 @@
}
}));
}
- return new ArrayList<String>(result).subList(startingAt, Math.min(fetch, result.size()));
+ return new ArrayList<String>(result).subList(startingAt, startingAt + Math.min(fetch, result.size()));
} catch (Exception e) {
throw new BException(e.getMessage(), e);
}
@@ -1876,7 +1878,15 @@
CommandStatus cs = scatterGather(cluster, new BlurCommand<CommandStatus>() {
@Override
public CommandStatus call(Client client) throws BlurException, TException {
- return client.commandStatus(commandExecutionId);
+ try {
+ return client.commandStatus(commandExecutionId);
+ } catch (BlurException e) {
+ String message = e.getMessage();
+ if (message.startsWith("NOT_FOUND")) {
+ return null;
+ }
+ throw e;
+ }
}
}, new Merger<CommandStatus>() {
@Override
@@ -1884,12 +1894,16 @@
CommandStatus commandStatus = null;
while (service.getRemainingCount() > 0) {
Future<CommandStatus> future = service.poll(_defaultParallelCallTimeout, TimeUnit.MILLISECONDS, true);
- commandStatus = mergeCommandStatus(commandStatus, service.getResultThrowException(future));
+ commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus,
+ service.getResultThrowException(future));
}
return commandStatus;
}
});
- commandStatus = mergeCommandStatus(commandStatus, cs);
+ commandStatus = CommandStatusUtil.mergeCommandStatus(commandStatus, cs);
+ }
+ if (commandStatus == null) {
+ throw new BException("NOT_FOUND {0}", commandExecutionId);
}
return commandStatus;
} catch (Exception e) {
@@ -1897,60 +1911,6 @@
}
}
- private static CommandStatus mergeCommandStatus(CommandStatus cs1, CommandStatus cs2) {
- if (cs1 == null && cs2 == null) {
- return null;
- } else if (cs1 == null) {
- return cs2;
- } else if (cs2 == null) {
- return cs1;
- } else {
- Map<String, Map<CommandStatusState, Long>> serverStateMap1 = cs1.getServerStateMap();
- Map<String, Map<CommandStatusState, Long>> serverStateMap2 = cs2.getServerStateMap();
- Map<String, Map<CommandStatusState, Long>> merge = mergeServerStateMap(serverStateMap1, serverStateMap2);
- return new CommandStatus(cs1.getExecutionId(), cs1.getCommandName(), cs1.getArguments(), merge, cs1.getUser());
- }
- }
-
- private static Map<String, Map<CommandStatusState, Long>> mergeServerStateMap(
- Map<String, Map<CommandStatusState, Long>> serverStateMap1,
- Map<String, Map<CommandStatusState, Long>> serverStateMap2) {
- Map<String, Map<CommandStatusState, Long>> result = new HashMap<String, Map<CommandStatusState, Long>>();
- Set<String> keys = new HashSet<String>();
- keys.addAll(serverStateMap1.keySet());
- keys.addAll(serverStateMap2.keySet());
- for (String key : keys) {
- Map<CommandStatusState, Long> css1 = serverStateMap2.get(key);
- Map<CommandStatusState, Long> css2 = serverStateMap2.get(key);
- result.put(key, mergeCommandStatusState(css1, css2));
- }
- return result;
- }
-
- private static Map<CommandStatusState, Long> mergeCommandStatusState(Map<CommandStatusState, Long> css1,
- Map<CommandStatusState, Long> css2) {
- if (css1 == null && css2 == null) {
- return new HashMap<CommandStatusState, Long>();
- } else if (css1 == null) {
- return css2;
- } else if (css2 == null) {
- return css1;
- } else {
- Map<CommandStatusState, Long> result = new HashMap<CommandStatusState, Long>(css1);
- for (Entry<CommandStatusState, Long> e : css2.entrySet()) {
- CommandStatusState key = e.getKey();
- Long l = result.get(key);
- Long value = e.getValue();
- if (l == null) {
- result.put(key, value);
- } else {
- result.put(key, l + value);
- }
- }
- return result;
- }
- }
-
@Override
public void commandCancel(String commandExecutionId) throws BlurException, TException {
try {
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
index 08b4400..ff03210 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/BlurShardServer.java
@@ -684,7 +684,11 @@
@Override
public CommandStatus commandStatus(String commandExecutionId) throws BlurException, TException {
try {
- return _commandManager.getCommandStatus(commandExecutionId);
+ CommandStatus commandStatus = _commandManager.getCommandStatus(commandExecutionId);
+ if (commandStatus == null) {
+ throw new BException("NOT_FOUND {0}", commandExecutionId);
+ }
+ return commandStatus;
} catch (Exception e) {
throw new BException(e.getMessage(), e);
}
diff --git a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
index 0b4e290..46bd8b0 100644
--- a/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
+++ b/blur-core/src/main/java/org/apache/blur/thrift/ThriftBlurShardServer.java
@@ -316,7 +316,7 @@
StreamServer streamServer;
int streamThreadCount = configuration.getInt(BLUR_STREAM_SERVER_THREADS, 100);
if (streamThreadCount > 0) {
- StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath);
+ StreamProcessor streamProcessor = new StreamProcessor(indexServer, tmpPath, config);
streamServer = new StreamServer(0, streamThreadCount, streamProcessor);
streamServer.start();
configuration.setInt(BLUR_STREAM_SERVER_RUNNING_PORT, streamServer.getPort());
diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
index c9c3774..3242931 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcher.java
@@ -16,24 +16,50 @@
*/
package org.apache.blur.utils;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
public class GCWatcher {
private static final String JAVA_VERSION = "java.version";
private static final String _1_7 = "1.7";
+ private static final String _1_8 = "1.8";
private static final boolean JDK7;
static {
Properties properties = System.getProperties();
String javaVersion = properties.getProperty(JAVA_VERSION);
- if (javaVersion.startsWith(_1_7)) {
+ if (javaVersion.startsWith(_1_7) || javaVersion.startsWith(_1_8)) {
JDK7 = true;
} else {
JDK7 = false;
}
}
+ public static void main(String[] args) {
+ GCWatcher.init(0.50);
+
+ GCWatcher.registerAction(new GCAction() {
+ @Override
+ public void takeAction() throws Exception {
+ System.out.println("OOM");
+ System.exit(0);
+ }
+ });
+
+ List<byte[]> lst = new ArrayList<byte[]>();
+
+ while (true) {
+ lst.add(new byte[1_000_000]);
+ MemoryUsage heapMemoryUsage = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ System.out.println(heapMemoryUsage.getMax() + " " + heapMemoryUsage.getUsed());
+ }
+
+ }
+
/**
* Initializes the GCWatcher to watch for any garbage collection that leaves
* more then the given ratio free. If more remains then all the given
diff --git a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
index 03bf6bb..2eb9f56 100644
--- a/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
+++ b/blur-core/src/main/java/org/apache/blur/utils/GCWatcherJdk6.java
@@ -151,7 +151,7 @@
}
_lastIndex = _gcInfo.getIndex();
}
- } catch (Exception e) {
+ } catch (Throwable e) {
e.printStackTrace();
}
}
diff --git a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
index ae635c3..f46b184 100644
--- a/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
+++ b/blur-core/src/test/java/org/apache/blur/manager/writer/IndexImporterTest.java
@@ -41,12 +41,15 @@
import org.apache.blur.thrift.generated.Record;
import org.apache.blur.thrift.generated.RowMutation;
import org.apache.blur.thrift.generated.TableDescriptor;
+import org.apache.blur.utils.BlurConstants;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.document.Field;
+import org.apache.lucene.document.StringField;
+import org.apache.lucene.document.Field.Store;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
@@ -292,6 +295,20 @@
}
@Test
+ public void testIndexImporterWithCorrectRowIdShardCombinationWithFastImport() throws IOException {
+ List<Field> document = _fieldManager.getFields("1", genRecord("1"));
+ document.add(new StringField(BlurConstants.NEW_ROW, BlurConstants.PRIME_DOC_VALUE, Store.NO));
+ _commitWriter.addDocument(document);
+ _commitWriter.commit();
+ _commitWriter.close();
+ _indexImporter.run();
+ assertFalse(_fileSystem.exists(_path));
+ assertFalse(_fileSystem.exists(_badRowIdsPath));
+ assertTrue(_fileSystem.exists(_inUsePath));
+ validateIndex();
+ }
+
+ @Test
public void testIndexImporterWithWrongRowIdShardCombination() throws IOException {
List<Field> document = _fieldManager.getFields("2", genRecord("1"));
_commitWriter.addDocument(document);
diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
index 059ad05..ea7e5ad 100644
--- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
+++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/index/FilterAccessControlFactory.java
@@ -338,6 +338,7 @@
}
List<IndexableField> result = new ArrayList<IndexableField>();
for (IndexableField field : fields) {
+ // If field is to be indexed and is to be read masked.
if (fieldsToMask.contains(field.name())) {
// If field is a doc value, then don't bother indexing.
if (!isDocValue(field)) {
diff --git a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
index 8a142cf..37e17a6 100644
--- a/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
+++ b/blur-document-security/src/main/java/org/apache/blur/lucene/security/search/BitSetDocumentVisibilityFilterCacheStrategy.java
@@ -31,6 +31,7 @@
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.DocIdSet;
import org.apache.lucene.search.DocIdSetIterator;
+import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.OpenBitSet;
@@ -62,12 +63,16 @@
@Override
public Builder createBuilder(String fieldName, BytesRef term, final AtomicReader reader) {
- final OpenBitSet bitSet = new OpenBitSet(reader.maxDoc());
+ int maxDoc = reader.maxDoc();
final Key key = new Key(fieldName, term, reader.getCoreCacheKey());
LOG.debug("Creating new bitset for key [" + key + "] on index [" + reader + "]");
return new Builder() {
+
+ private OpenBitSet bitSet = new OpenBitSet(maxDoc);
+
@Override
public void or(DocIdSetIterator it) throws IOException {
+ LOG.debug("Building bitset for key [" + key + "]");
int doc;
while ((doc = it.nextDoc()) != DocsEnum.NO_MORE_DOCS) {
bitSet.set(doc);
@@ -76,7 +81,6 @@
@Override
public DocIdSet getDocIdSet() throws IOException {
- LOG.debug("Building bitset for key [" + key + "]");
SegmentReader segmentReader = getSegmentReader(reader);
segmentReader.addReaderClosedListener(new ReaderClosedListener() {
@Override
@@ -88,12 +92,172 @@
}
}
});
- _cache.put(key, bitSet);
- return bitSet;
+ long cardinality = bitSet.cardinality();
+ DocIdSet cacheDocIdSet;
+ if (isFullySet(maxDoc, bitSet, cardinality)) {
+ cacheDocIdSet = getFullySetDocIdSet(maxDoc);
+ } else if (isFullyEmpty(bitSet, cardinality)) {
+ cacheDocIdSet = getFullyEmptyDocIdSet(maxDoc);
+ } else {
+ cacheDocIdSet = bitSet;
+ }
+ _cache.put(key, cacheDocIdSet);
+ return cacheDocIdSet;
}
};
}
+ public static DocIdSet getFullyEmptyDocIdSet(int maxDoc) {
+ Bits bits = getFullyEmptyBits(maxDoc);
+ return new DocIdSet() {
+ @Override
+ public DocIdSetIterator iterator() throws IOException {
+ return getFullyEmptyDocIdSetIterator(maxDoc);
+ }
+
+ @Override
+ public Bits bits() throws IOException {
+ return bits;
+ }
+
+ @Override
+ public boolean isCacheable() {
+ return true;
+ }
+ };
+ }
+
+ public static DocIdSetIterator getFullyEmptyDocIdSetIterator(int maxDoc) {
+ return new DocIdSetIterator() {
+
+ private int _docId = -1;
+
+ @Override
+ public int docID() {
+ return _docId;
+ }
+
+ @Override
+ public int nextDoc() throws IOException {
+ return _docId = DocIdSetIterator.NO_MORE_DOCS;
+ }
+
+ @Override
+ public int advance(int target) throws IOException {
+ return _docId = DocIdSetIterator.NO_MORE_DOCS;
+ }
+
+ @Override
+ public long cost() {
+ return 0;
+ }
+ };
+ }
+
+ public static Bits getFullyEmptyBits(int maxDoc) {
+ return new Bits() {
+ @Override
+ public boolean get(int index) {
+ return false;
+ }
+
+ @Override
+ public int length() {
+ return maxDoc;
+ }
+ };
+ }
+
+ public static DocIdSet getFullySetDocIdSet(int maxDoc) {
+ Bits bits = getFullySetBits(maxDoc);
+ return new DocIdSet() {
+ @Override
+ public DocIdSetIterator iterator() throws IOException {
+ return getFullySetDocIdSetIterator(maxDoc);
+ }
+
+ @Override
+ public Bits bits() throws IOException {
+ return bits;
+ }
+
+ @Override
+ public boolean isCacheable() {
+ return true;
+ }
+ };
+ }
+
+ public static DocIdSetIterator getFullySetDocIdSetIterator(int maxDoc) {
+ return new DocIdSetIterator() {
+
+ private int _docId = -1;
+
+ @Override
+ public int advance(int target) throws IOException {
+ if (_docId == DocIdSetIterator.NO_MORE_DOCS) {
+ return DocIdSetIterator.NO_MORE_DOCS;
+ }
+ _docId = target;
+ if (_docId >= maxDoc) {
+ return _docId = DocIdSetIterator.NO_MORE_DOCS;
+ }
+ return _docId;
+ }
+
+ @Override
+ public int nextDoc() throws IOException {
+ if (_docId == DocIdSetIterator.NO_MORE_DOCS) {
+ return DocIdSetIterator.NO_MORE_DOCS;
+ }
+ _docId++;
+ if (_docId >= maxDoc) {
+ return _docId = DocIdSetIterator.NO_MORE_DOCS;
+ }
+ return _docId;
+ }
+
+ @Override
+ public int docID() {
+ return _docId;
+ }
+
+ @Override
+ public long cost() {
+ return 0l;
+ }
+
+ };
+ }
+
+ public static Bits getFullySetBits(int maxDoc) {
+ return new Bits() {
+ @Override
+ public boolean get(int index) {
+ return true;
+ }
+
+ @Override
+ public int length() {
+ return maxDoc;
+ }
+ };
+ }
+
+ public static boolean isFullyEmpty(OpenBitSet bitSet, long cardinality) {
+ if (cardinality == 0) {
+ return true;
+ }
+ return false;
+ }
+
+ public static boolean isFullySet(int maxDoc, OpenBitSet bitSet, long cardinality) {
+ if (cardinality >= maxDoc) {
+ return true;
+ }
+ return false;
+ }
+
public static SegmentReader getSegmentReader(IndexReader indexReader) throws IOException {
if (indexReader instanceof SegmentReader) {
return (SegmentReader) indexReader;