blob: 1f53d2700533c44f3d2764d7fe635b00ae0973c2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling.index;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.journaling.JournaledProvenanceEvent;
import org.apache.nifi.provenance.journaling.config.JournalingRepositoryConfig;
import org.apache.nifi.provenance.journaling.toc.TocJournalReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LuceneIndexManager implements IndexManager {
private static final Logger logger = LoggerFactory.getLogger(LuceneIndexManager.class);
private final ProvenanceEventRepository repo;
private final JournalingRepositoryConfig config;
private final ExecutorService queryExecutor;
private final Map<String, List<LuceneIndexWriter>> writers = new HashMap<>();
private final Map<String, AtomicLong> writerIndexes = new HashMap<>();
private final ConcurrentMap<String, IndexSize> indexSizes = new ConcurrentHashMap<>();
public LuceneIndexManager(final ProvenanceEventRepository repo, final JournalingRepositoryConfig config, final ScheduledExecutorService workerExecutor, final ExecutorService queryExecutor) throws IOException {
this.repo = repo;
this.config = config;
this.queryExecutor = queryExecutor;
final int rolloverSeconds = (int) config.getJournalRolloverPeriod(TimeUnit.SECONDS);
if ( !config.isReadOnly() ) {
for ( final Map.Entry<String, File> entry : config.getContainers().entrySet() ) {
final String containerName = entry.getKey();
final File container = entry.getValue();
final List<LuceneIndexWriter> writerList = new ArrayList<>(config.getIndexesPerContainer());
writers.put(containerName, writerList);
writerIndexes.put(containerName, new AtomicLong(0L));
for ( int i=0; i < config.getIndexesPerContainer(); i++ ){
final File indexDir = new File(container, "indices/" + i);
writerList.add(new LuceneIndexWriter(repo, indexDir, config));
}
workerExecutor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
sync(containerName);
} catch (final Throwable t) {
logger.error("Failed to sync Provenance Repository Container {} due to {}", containerName, t);
if ( logger.isDebugEnabled() ) {
logger.error("", t);
}
}
}
}, rolloverSeconds, rolloverSeconds, TimeUnit.SECONDS);
}
}
}
@Override
public EventIndexSearcher newIndexSearcher(final String containerName) throws IOException {
final File containerDir = config.getContainers().get(containerName);
if ( containerDir == null ) {
throw new IllegalArgumentException();
}
final List<EventIndexSearcher> searchers = new ArrayList<>();
try {
if (config.isReadOnly()) {
for (int i=0; i < config.getIndexesPerContainer(); i++) {
final File indexDir = new File(containerName, "indices/" + i);
searchers.add(new LuceneIndexSearcher(repo, indexDir));
}
} else {
final List<LuceneIndexWriter> writerList = writers.get(containerName);
for ( final LuceneIndexWriter writer : writerList ) {
searchers.add(writer.newIndexSearcher());
}
}
} catch (final IOException ioe) {
// If we failed to create a searcher, we need to close all that we've already created.
for ( final EventIndexSearcher searcher : searchers ) {
try {
searcher.close();
} catch (final IOException ioe2) {
ioe.addSuppressed(ioe2);
}
}
throw ioe;
}
return new MultiIndexSearcher(searchers);
}
@Override
public LuceneIndexWriter getIndexWriter(final String container) {
if (config.isReadOnly() ) {
throw new IllegalStateException("Cannot obtain Index Writer because repository is read-only");
}
final AtomicLong index = writerIndexes.get(container);
if (index == null ) {
throw new IllegalArgumentException();
}
final long curVal = index.get();
final List<LuceneIndexWriter> writerList = writers.get(container);
return writerList.get((int) (curVal % writerList.size()));
}
@Override
public Long getMaxEventId(final String container, final String section) throws IOException {
final List<LuceneIndexWriter> writerList = writers.get(container);
if ( writerList == null ) {
return null;
}
Long max = null;
for ( final LuceneIndexWriter writer : writerList ) {
try (final EventIndexSearcher searcher = writer.newIndexSearcher()) {
final Long maxForWriter = searcher.getMaxEventId(container, section);
if ( maxForWriter != null ) {
if (max == null || maxForWriter.longValue() > max.longValue() ) {
max = maxForWriter;
}
}
}
}
return max;
}
@Override
public void sync() throws IOException {
for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
for ( final LuceneIndexWriter writer : writerList ) {
writer.sync();
}
}
}
private void sync(final String containerName) throws IOException {
final AtomicLong index = writerIndexes.get(containerName);
final long curIndex = index.getAndIncrement();
final List<LuceneIndexWriter> writerList = writers.get(containerName);
final EventIndexWriter toSync = writerList.get((int) (curIndex % writerList.size()));
toSync.sync();
}
@Override
public void close() throws IOException {
for ( final List<LuceneIndexWriter> writerList : writers.values() ) {
for ( final LuceneIndexWriter writer : writerList ) {
try {
writer.close();
} catch (final IOException ioe) {
logger.warn("Failed to close {} due to {}", writer, ioe);
if ( logger.isDebugEnabled() ) {
logger.warn("", ioe);
}
}
}
}
}
@Override
public Set<EventIndexSearcher> getSearchers() throws IOException {
final Set<EventIndexSearcher> searchers = new HashSet<>();
try {
final Set<String> containerNames = config.getContainers().keySet();
for (final String containerName : containerNames) {
final EventIndexSearcher searcher = newIndexSearcher(containerName);
searchers.add(searcher);
}
return searchers;
} catch (final Exception e) {
for ( final EventIndexSearcher searcher : searchers ) {
try {
searcher.close();
} catch (final IOException ioe) {
e.addSuppressed(ioe);
}
}
throw e;
}
}
@Override
public <T> Set<T> withEachIndex(final IndexAction<T> action) throws IOException {
return withEachIndex(action, true);
}
@Override
public <T> Set<T> withEachIndex(final IndexAction<T> action, final boolean autoClose) throws IOException {
final Set<T> results = new HashSet<>();
final Map<String, Future<T>> futures = new HashMap<>();
final Set<String> containerNames = config.getContainers().keySet();
for (final String containerName : containerNames) {
final Callable<T> callable = new Callable<T>() {
@Override
public T call() throws Exception {
final EventIndexSearcher searcher = newIndexSearcher(containerName);
try {
return action.perform(searcher);
} finally {
if ( autoClose ) {
searcher.close();
}
}
}
};
final Future<T> future = queryExecutor.submit(callable);
futures.put(containerName, future);
}
for ( final Map.Entry<String, Future<T>> entry : futures.entrySet() ) {
try {
final T result = entry.getValue().get();
results.add(result);
} catch (final ExecutionException ee) {
final Throwable cause = ee.getCause();
if ( cause instanceof IOException ) {
throw (IOException) cause;
} else {
throw new RuntimeException("Failed to query Container " + entry.getKey() + " due to " + cause, cause);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return results;
}
@Override
public void withEachIndex(final VoidIndexAction action) throws IOException {
withEachIndex(action, false);
}
@Override
public void withEachIndex(final VoidIndexAction action, final boolean async) throws IOException {
final Map<String, Future<?>> futures = new HashMap<>();
final Set<String> containerNames = config.getContainers().keySet();
for (final String containerName : containerNames) {
final Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws IOException {
try (final EventIndexSearcher searcher = newIndexSearcher(containerName)) {
action.perform(searcher);
return null;
} catch (final Throwable t) {
if ( async ) {
logger.error("Failed to perform action against container " + containerName + " due to " + t, t);
if ( logger.isDebugEnabled() ) {
logger.error("", t);
}
return null;
} else {
throw new IOException("Failed to perform action against container " + containerName + " due to " + t, t);
}
}
}
};
final Future<?> future = queryExecutor.submit(callable);
futures.put(containerName, future);
}
if ( !async ) {
for ( final Map.Entry<String, Future<?>> entry : futures.entrySet() ) {
try {
// throw any exception thrown by runnable
entry.getValue().get();
} catch (final ExecutionException ee) {
final Throwable cause = ee.getCause();
if ( cause instanceof IOException ) {
throw ((IOException) cause);
}
throw new RuntimeException("Failed to query Partition " + entry.getKey() + " due to " + cause, cause);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
@Override
public int getNumberOfIndices() {
return config.getContainers().size();
}
@Override
public void deleteEvents(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
final List<LuceneIndexWriter> writerList = writers.get(containerName);
for ( final LuceneIndexWriter writer : writerList ) {
writer.delete(containerName, String.valueOf(sectionIndex), journalId);
}
}
@Override
public void deleteEventsBefore(final String containerName, final int sectionIndex, final Long journalId) throws IOException {
final List<LuceneIndexWriter> writerList = writers.get(containerName);
for ( final LuceneIndexWriter writer : writerList ) {
writer.deleteEventsBefore(containerName, String.valueOf(sectionIndex), journalId);
}
}
@Override
public void reindex(final String containerName, final int sectionIndex, final Long journalId, final File journalFile) throws IOException {
deleteEvents(containerName, sectionIndex, journalId);
final LuceneIndexWriter writer = getIndexWriter(containerName);
try (final TocJournalReader reader = new TocJournalReader(containerName, String.valueOf(sectionIndex), journalId, journalFile)) {
final List<JournaledProvenanceEvent> events = new ArrayList<>(1000);
JournaledProvenanceEvent event;
while ((event = reader.nextJournaledEvent()) != null) {
events.add(event);
if ( events.size() >= 1000 ) {
writer.index(events);
events.clear();
}
}
if (!events.isEmpty() ) {
writer.index(events);
}
}
}
@Override
public long getNumberOfEvents() throws IOException {
final AtomicLong totalCount = new AtomicLong(0L);
withEachIndex(new VoidIndexAction() {
@Override
public void perform(final EventIndexSearcher searcher) throws IOException {
totalCount.addAndGet(searcher.getNumberOfEvents());
}
});
return totalCount.get();
}
@Override
public void deleteOldEvents(final long earliestEventTimeToDelete) throws IOException {
for ( final String containerName : config.getContainers().keySet() ) {
final List<LuceneIndexWriter> writerList = writers.get(containerName);
for ( final LuceneIndexWriter writer : writerList ) {
writer.deleteOldEvents(earliestEventTimeToDelete);
}
}
}
@Override
public long getSize(final String containerName) {
// Cache index sizes so that we don't have to continually calculate it, as calculating it requires
// disk accesses, which are quite expensive.
final IndexSize indexSize = indexSizes.get(containerName);
if ( indexSize != null && !indexSize.isExpired() ) {
return indexSize.getSize();
}
final File containerFile = config.getContainers().get(containerName);
final File indicesDir = new File(containerFile, "indices");
final long size = getSize(indicesDir);
indexSizes.put(containerName, new IndexSize(size));
return size;
}
private long getSize(final File file) {
if ( file.isDirectory() ) {
long totalSize = 0L;
final File[] children = file.listFiles();
if ( children != null ) {
for ( final File child : children ) {
totalSize += getSize(child);
}
}
return totalSize;
} else {
return file.length();
}
}
private static class IndexSize {
private final long size;
private final long expirationTime;
public IndexSize(final long size) {
this.size = size;
this.expirationTime = System.currentTimeMillis() + 5000L; // good for 5 seconds
}
public long getSize() {
return size;
}
public boolean isExpired() {
return System.currentTimeMillis() > expirationTime;
}
}
}