| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.provenance; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| |
| import org.apache.nifi.events.EventReporter; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.processor.DataUnit; |
| import org.apache.nifi.provenance.lineage.ComputeLineageSubmission; |
| import org.apache.nifi.provenance.lineage.FlowFileLineage; |
| import org.apache.nifi.provenance.lineage.Lineage; |
| import org.apache.nifi.provenance.lineage.LineageComputationType; |
| import org.apache.nifi.provenance.query.ProvenanceQuerySubmission; |
| import org.apache.nifi.provenance.search.Query; |
| import org.apache.nifi.provenance.search.QueryResult; |
| import org.apache.nifi.provenance.search.QuerySubmission; |
| import org.apache.nifi.provenance.search.SearchTerm; |
| import org.apache.nifi.provenance.search.SearchableField; |
| import org.apache.nifi.util.IntegerHolder; |
| import org.apache.nifi.util.NiFiProperties; |
| import org.apache.nifi.util.RingBuffer; |
| import org.apache.nifi.util.RingBuffer.Filter; |
| import org.apache.nifi.util.RingBuffer.ForEachEvaluator; |
| import org.apache.nifi.util.RingBuffer.IterationDirection; |
| |
| public class VolatileProvenanceRepository implements ProvenanceEventRepository { |
| private static final int MAX_CONCURRENT_QUERIES = 10; |
| |
| // properties |
| public static final String BUFFER_SIZE = "nifi.provenance.repository.buffer.size"; |
| |
| // default property values |
| public static final int DEFAULT_BUFFER_SIZE = 10000; |
| |
| private final RingBuffer<StoredProvenanceEvent> ringBuffer; |
| private final List<SearchableField> searchableFields; |
| private final List<SearchableField> searchableAttributes; |
| private final ExecutorService queryExecService; |
| private final ScheduledExecutorService scheduledExecService; |
| |
| private final ConcurrentMap<String, AsyncQuerySubmission> querySubmissionMap = new ConcurrentHashMap<>(); |
| private final ConcurrentMap<String, AsyncLineageSubmission> lineageSubmissionMap = new ConcurrentHashMap<>(); |
| private final ConcurrentMap<String, ProvenanceQuerySubmission> provQuerySubmissionMap = new ConcurrentHashMap<>(); |
| private final AtomicLong idGenerator = new AtomicLong(0L); |
| private final AtomicBoolean initialized = new AtomicBoolean(false); |
| |
| public VolatileProvenanceRepository() { |
| final NiFiProperties properties = NiFiProperties.getInstance(); |
| |
| final int bufferSize = properties.getIntegerProperty(BUFFER_SIZE, DEFAULT_BUFFER_SIZE); |
| ringBuffer = new RingBuffer<>(bufferSize); |
| |
| final String indexedFieldString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_FIELDS); |
| final String indexedAttrString = properties.getProperty(NiFiProperties.PROVENANCE_INDEXED_ATTRIBUTES); |
| |
| searchableFields = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedFieldString, true)); |
| searchableAttributes = Collections.unmodifiableList(SearchableFieldParser.extractSearchableFields(indexedAttrString, false)); |
| |
| final ThreadFactory defaultThreadFactory = Executors.defaultThreadFactory(); |
| queryExecService = Executors.newFixedThreadPool(2, new ThreadFactory() { |
| private final AtomicInteger counter = new AtomicInteger(0); |
| |
| @Override |
| public Thread newThread(final Runnable r) { |
| final Thread thread = defaultThreadFactory.newThread(r); |
| thread.setName("Provenance Query Thread-" + counter.incrementAndGet()); |
| return thread; |
| } |
| }); |
| |
| scheduledExecService = Executors.newScheduledThreadPool(2); |
| } |
| |
| @Override |
| public void initialize(final EventReporter eventReporter) { |
| if (initialized.getAndSet(true)) { |
| return; |
| } |
| |
| scheduledExecService.scheduleWithFixedDelay(new RemoveExpiredQueryResults(), 30L, 30L, TimeUnit.SECONDS); |
| } |
| |
| @Override |
| public ProvenanceEventBuilder eventBuilder() { |
| return new StandardProvenanceEventRecord.Builder(); |
| } |
| |
| @Override |
| public void registerEvent(final ProvenanceEventRecord event) { |
| final long id = idGenerator.getAndIncrement(); |
| ringBuffer.add(new IdEnrichedProvenanceEvent(event, id)); |
| } |
| |
| @Override |
| public void registerEvents(final Collection<ProvenanceEventRecord> events) { |
| for (final ProvenanceEventRecord event : events) { |
| registerEvent(event); |
| } |
| } |
| |
| @Override |
| public List<StoredProvenanceEvent> getEvents(final long firstRecordId, final int maxRecords) throws IOException { |
| return ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { |
| @Override |
| public boolean select(final StoredProvenanceEvent value) { |
| return value.getEventId() >= firstRecordId; |
| } |
| }, maxRecords); |
| } |
| |
| @Override |
| public Long getMaxEventId() { |
| final ProvenanceEventRecord newest = ringBuffer.getNewestElement(); |
| return (newest == null) ? null : newest.getEventId(); |
| } |
| |
| public StoredProvenanceEvent getEvent(final String identifier) throws IOException { |
| final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { |
| @Override |
| public boolean select(final StoredProvenanceEvent event) { |
| return identifier.equals(event.getFlowFileUuid()); |
| } |
| }, 1); |
| |
| return records.isEmpty() ? null : records.get(0); |
| } |
| |
| @Override |
| public StoredProvenanceEvent getEvent(final long id) { |
| final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(new Filter<StoredProvenanceEvent>() { |
| @Override |
| public boolean select(final StoredProvenanceEvent event) { |
| return event.getEventId() == id; |
| } |
| }, 1); |
| |
| return records.isEmpty() ? null : records.get(0); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| queryExecService.shutdownNow(); |
| scheduledExecService.shutdown(); |
| } |
| |
| @Override |
| public List<SearchableField> getSearchableFields() { |
| return searchableFields; |
| } |
| |
| @Override |
| public List<SearchableField> getSearchableAttributes() { |
| return searchableAttributes; |
| } |
| |
| public QueryResult queryEvents(final Query query) throws IOException { |
| final QuerySubmission submission = submitQuery(query); |
| final QueryResult result = submission.getResult(); |
| while (!result.isFinished()) { |
| try { |
| Thread.sleep(100L); |
| } catch (final InterruptedException ie) { |
| } |
| } |
| |
| if (result.getError() != null) { |
| throw new IOException(result.getError()); |
| } |
| |
| return result; |
| } |
| |
| private Filter<ProvenanceEventRecord> createFilter(final Query query) { |
| return new Filter<ProvenanceEventRecord>() { |
| @Override |
| public boolean select(final ProvenanceEventRecord event) { |
| if (query.getStartDate() != null && query.getStartDate().getTime() > event.getEventTime()) { |
| return false; |
| } |
| |
| if (query.getEndDate() != null && query.getEndDate().getTime() < event.getEventTime()) { |
| return false; |
| } |
| |
| if (query.getMaxFileSize() != null) { |
| final long maxFileSize = DataUnit.parseDataSize(query.getMaxFileSize(), DataUnit.B).longValue(); |
| if (event.getFileSize() > maxFileSize) { |
| return false; |
| } |
| } |
| |
| if (query.getMinFileSize() != null) { |
| final long minFileSize = DataUnit.parseDataSize(query.getMinFileSize(), DataUnit.B).longValue(); |
| if (event.getFileSize() < minFileSize) { |
| return false; |
| } |
| } |
| |
| for (final SearchTerm searchTerm : query.getSearchTerms()) { |
| final SearchableField searchableField = searchTerm.getSearchableField(); |
| final String searchValue = searchTerm.getValue(); |
| |
| if (searchableField.isAttribute()) { |
| final String attributeName = searchableField.getIdentifier(); |
| |
| final String eventAttributeValue = event.getAttributes().get(attributeName); |
| |
| if (searchValue.contains("?") || searchValue.contains("*")) { |
| if (eventAttributeValue == null || eventAttributeValue.isEmpty()) { |
| return false; |
| } |
| |
| final String regex = searchValue.replace("?", ".").replace("*", ".*"); |
| final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); |
| if (!pattern.matcher(eventAttributeValue).matches()) { |
| return false; |
| } |
| } else { |
| if (!searchValue.equalsIgnoreCase(eventAttributeValue)) { |
| return false; |
| } |
| } |
| } else { |
| // if FlowFileUUID, search parent & child UUID's also. |
| if (searchableField.equals(SearchableFields.FlowFileUUID)) { |
| if (searchValue.contains("?") || searchValue.contains("*")) { |
| final String regex = searchValue.replace("?", ".").replace("*", ".*"); |
| final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); |
| if (pattern.matcher(event.getFlowFileUuid()).matches()) { |
| continue; |
| } |
| |
| boolean found = false; |
| for (final String uuid : event.getParentUuids()) { |
| if (pattern.matcher(uuid).matches()) { |
| found = true; |
| break; |
| } |
| } |
| |
| for (final String uuid : event.getChildUuids()) { |
| if (pattern.matcher(uuid).matches()) { |
| found = true; |
| break; |
| } |
| } |
| |
| if (found) { |
| continue; |
| } |
| } else if (event.getFlowFileUuid().equals(searchValue) || event.getParentUuids().contains(searchValue) || event.getChildUuids().contains(searchValue)) { |
| continue; |
| } |
| |
| return false; |
| } |
| |
| final Object fieldValue = getFieldValue(event, searchableField); |
| if (fieldValue == null) { |
| return false; |
| } |
| |
| if (searchValue.contains("?") || searchValue.contains("*")) { |
| final String regex = searchValue.replace("?", ".").replace("*", ".*"); |
| final Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE); |
| if (!pattern.matcher(String.valueOf(fieldValue)).matches()) { |
| return false; |
| } |
| } else { |
| if (!searchValue.equalsIgnoreCase(String.valueOf(fieldValue))) { |
| return false; |
| } |
| } |
| } |
| } |
| |
| return true; |
| } |
| }; |
| } |
| |
| private Object getFieldValue(final ProvenanceEventRecord record, final SearchableField field) { |
| if (SearchableFields.AlternateIdentifierURI.equals(field)) { |
| return record.getAlternateIdentifierUri(); |
| } |
| if (SearchableFields.ComponentID.equals(field)) { |
| return record.getComponentId(); |
| } |
| if (SearchableFields.Details.equals(field)) { |
| return record.getDetails(); |
| } |
| if (SearchableFields.EventTime.equals(field)) { |
| return record.getEventTime(); |
| } |
| if (SearchableFields.EventType.equals(field)) { |
| return record.getEventType(); |
| } |
| if (SearchableFields.Filename.equals(field)) { |
| return record.getAttributes().get(CoreAttributes.FILENAME.key()); |
| } |
| if (SearchableFields.FileSize.equals(field)) { |
| return record.getFileSize(); |
| } |
| if (SearchableFields.FlowFileUUID.equals(field)) { |
| return record.getFlowFileUuid(); |
| } |
| if (SearchableFields.LineageStartDate.equals(field)) { |
| return record.getLineageStartDate(); |
| } |
| if (SearchableFields.Relationship.equals(field)) { |
| return record.getRelationship(); |
| } |
| if (SearchableFields.TransitURI.equals(field)) { |
| return record.getTransitUri(); |
| } |
| |
| return null; |
| } |
| |
| @Override |
| public QuerySubmission submitQuery(final Query query) { |
| if (query.getEndDate() != null && query.getStartDate() != null && query.getStartDate().getTime() > query.getEndDate().getTime()) { |
| throw new IllegalArgumentException("Query End Time cannot be before Query Start Time"); |
| } |
| |
| if (query.getSearchTerms().isEmpty() && query.getStartDate() == null && query.getEndDate() == null) { |
| final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1); |
| queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result)); |
| querySubmissionMap.put(query.getIdentifier(), result); |
| return result; |
| } |
| |
| final AsyncQuerySubmission result = new AsyncQuerySubmission(query, 1); |
| querySubmissionMap.put(query.getIdentifier(), result); |
| queryExecService.submit(new QueryRunnable(ringBuffer, createFilter(query), query.getMaxResults(), result)); |
| |
| return result; |
| } |
| |
| @Override |
| public QuerySubmission retrieveQuerySubmission(final String queryIdentifier) { |
| return querySubmissionMap.get(queryIdentifier); |
| } |
| |
| public Lineage computeLineage(final String flowFileUUID) throws IOException { |
| return computeLineage(Collections.<String>singleton(flowFileUUID), LineageComputationType.FLOWFILE_LINEAGE, null); |
| } |
| |
| private Lineage computeLineage(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) throws IOException { |
| final AsyncLineageSubmission submission = submitLineageComputation(flowFileUuids, computationType, eventId); |
| final StandardLineageResult result = submission.getResult(); |
| while (!result.isFinished()) { |
| try { |
| Thread.sleep(100L); |
| } catch (final InterruptedException ie) { |
| } |
| } |
| |
| if (result.getError() != null) { |
| throw new IOException(result.getError()); |
| } |
| |
| return new FlowFileLineage(result.getNodes(), result.getEdges()); |
| } |
| |
| @Override |
| public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid) { |
| return submitLineageComputation(Collections.singleton(flowFileUuid), LineageComputationType.FLOWFILE_LINEAGE, null); |
| } |
| |
| @Override |
| public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier) { |
| return lineageSubmissionMap.get(lineageIdentifier); |
| } |
| |
| public Lineage expandSpawnEventParents(String identifier) throws IOException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| |
| @Override |
| public List<StoredProvenanceEvent> getEvents(final List<StorageLocation> storageLocations) throws IOException { |
| final List<StoredProvenanceEvent> events = new ArrayList<>(storageLocations.size()); |
| for ( final StorageLocation location : storageLocations ) { |
| if ( !(location instanceof EventIdLocation) ) { |
| throw new IllegalArgumentException("Illegal Storage Location"); |
| } |
| |
| final long id = ((EventIdLocation) location).getId(); |
| final StoredProvenanceEvent event = getEvent(id); |
| if ( event != null ) { |
| events.add(event); |
| } |
| } |
| return events; |
| } |
| |
| @Override |
| public StoredProvenanceEvent getEvent(final StorageLocation location) throws IOException { |
| if ( !(location instanceof EventIdLocation) ) { |
| throw new IllegalArgumentException("Illegal Storage Location"); |
| } |
| |
| final long id = ((EventIdLocation) location).getId(); |
| return getEvent(id); |
| } |
| |
| @Override |
| public Long getEarliestEventTime() throws IOException { |
| final List<StoredProvenanceEvent> events = getEvents(0L, 1); |
| if ( events.isEmpty() ) { |
| return null; |
| } |
| |
| return events.get(0).getEventTime(); |
| } |
| |
| @Override |
| public ComputeLineageSubmission submitExpandParents(final long eventId) { |
| final ProvenanceEventRecord event = getEvent(eventId); |
| if (event == null) { |
| final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); |
| lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); |
| submission.getResult().update(Collections.<StoredProvenanceEvent>emptyList()); |
| return submission; |
| } |
| |
| switch (event.getEventType()) { |
| case JOIN: |
| case FORK: |
| case REPLAY: |
| case CLONE: |
| return submitLineageComputation(event.getParentUuids(), LineageComputationType.EXPAND_PARENTS, eventId); |
| default: { |
| final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_PARENTS, eventId, Collections.<String>emptyList(), 1); |
| lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); |
| submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its parents cannot be expanded"); |
| return submission; |
| } |
| } |
| } |
| |
| |
| @Override |
| public ComputeLineageSubmission submitExpandChildren(final long eventId) { |
| final ProvenanceEventRecord event = getEvent(eventId); |
| if (event == null) { |
| final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); |
| lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); |
| submission.getResult().update(Collections.<StoredProvenanceEvent>emptyList()); |
| return submission; |
| } |
| |
| switch (event.getEventType()) { |
| case JOIN: |
| case FORK: |
| case REPLAY: |
| case CLONE: |
| return submitLineageComputation(event.getChildUuids(), LineageComputationType.EXPAND_CHILDREN, eventId); |
| default: { |
| final AsyncLineageSubmission submission = new AsyncLineageSubmission(LineageComputationType.EXPAND_CHILDREN, eventId, Collections.<String>emptyList(), 1); |
| lineageSubmissionMap.put(submission.getLineageIdentifier(), submission); |
| submission.getResult().setError("Event ID " + eventId + " indicates an event of type " + event.getEventType() + " so its children cannot be expanded"); |
| return submission; |
| } |
| } |
| } |
| |
| private AsyncLineageSubmission submitLineageComputation(final Collection<String> flowFileUuids, final LineageComputationType computationType, final Long eventId) { |
| final AsyncLineageSubmission result = new AsyncLineageSubmission(computationType, eventId, flowFileUuids, 1); |
| lineageSubmissionMap.put(result.getLineageIdentifier(), result); |
| |
| final Filter<StoredProvenanceEvent> filter = new Filter<StoredProvenanceEvent>() { |
| @Override |
| public boolean select(final StoredProvenanceEvent event) { |
| if (flowFileUuids.contains(event.getFlowFileUuid())) { |
| return true; |
| } |
| |
| for (final String parentId : event.getParentUuids()) { |
| if (flowFileUuids.contains(parentId)) { |
| return true; |
| } |
| } |
| |
| for (final String childId : event.getChildUuids()) { |
| if (flowFileUuids.contains(childId)) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| }; |
| |
| queryExecService.submit(new ComputeLineageRunnable(ringBuffer, filter, result)); |
| |
| return result; |
| } |
| |
| private static class QueryRunnable implements Runnable { |
| |
| private final RingBuffer<StoredProvenanceEvent> ringBuffer; |
| private final Filter<ProvenanceEventRecord> filter; |
| private final AsyncQuerySubmission submission; |
| private final int maxRecords; |
| |
| public QueryRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<ProvenanceEventRecord> filter, final int maxRecords, final AsyncQuerySubmission submission) { |
| this.ringBuffer = ringBuffer; |
| this.filter = filter; |
| this.submission = submission; |
| this.maxRecords = maxRecords; |
| } |
| |
| @Override |
| public void run() { |
| // Retrieve the most recent results and count the total number of matches |
| final IntegerHolder matchingCount = new IntegerHolder(0); |
| final List<StoredProvenanceEvent> matchingRecords = new ArrayList<>(maxRecords); |
| ringBuffer.forEach(new ForEachEvaluator<StoredProvenanceEvent>() { |
| @Override |
| public boolean evaluate(final StoredProvenanceEvent record) { |
| if (filter.select(record)) { |
| if (matchingCount.incrementAndGet() <= maxRecords) { |
| matchingRecords.add(record); |
| } |
| } |
| |
| return true; |
| } |
| |
| }, IterationDirection.BACKWARD); |
| |
| submission.getResult().update(matchingRecords, matchingCount.get()); |
| } |
| } |
| |
| private static class ComputeLineageRunnable implements Runnable { |
| |
| private final RingBuffer<StoredProvenanceEvent> ringBuffer; |
| private final Filter<StoredProvenanceEvent> filter; |
| private final AsyncLineageSubmission submission; |
| |
| public ComputeLineageRunnable(final RingBuffer<StoredProvenanceEvent> ringBuffer, final Filter<StoredProvenanceEvent> filter, final AsyncLineageSubmission submission) { |
| this.ringBuffer = ringBuffer; |
| this.filter = filter; |
| this.submission = submission; |
| } |
| |
| @Override |
| @SuppressWarnings({ "unchecked", "rawtypes" }) |
| public void run() { |
| final List<StoredProvenanceEvent> records = ringBuffer.getSelectedElements(filter); |
| submission.getResult().update((List) records); |
| } |
| } |
| |
| private class RemoveExpiredQueryResults implements Runnable { |
| |
| @Override |
| public void run() { |
| final Date now = new Date(); |
| |
| final Iterator<Map.Entry<String, AsyncQuerySubmission>> queryIterator = querySubmissionMap.entrySet().iterator(); |
| while (queryIterator.hasNext()) { |
| final Map.Entry<String, AsyncQuerySubmission> entry = queryIterator.next(); |
| |
| final StandardQueryResult result = entry.getValue().getResult(); |
| if (result.isFinished() && result.getExpiration().before(now)) { |
| querySubmissionMap.remove(entry.getKey()); |
| } |
| } |
| |
| final Iterator<Map.Entry<String, AsyncLineageSubmission>> lineageIterator = lineageSubmissionMap.entrySet().iterator(); |
| while (lineageIterator.hasNext()) { |
| final Map.Entry<String, AsyncLineageSubmission> entry = lineageIterator.next(); |
| |
| final StandardLineageResult result = entry.getValue().getResult(); |
| if (result.isFinished() && result.getExpiration().before(now)) { |
| querySubmissionMap.remove(entry.getKey()); |
| } |
| } |
| } |
| } |
| |
| |
| |
| |
| @Override |
| public ProvenanceQuerySubmission submitQuery(final String query) { |
| throw new UnsupportedOperationException(); |
| /* |
| if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) { |
| final List<String> toRemove = new ArrayList<>(); |
| final Date now = new Date(); |
| |
| for ( final Map.Entry<String, ProvenanceQuerySubmission> entry : provQuerySubmissionMap.entrySet() ) { |
| if ( entry.getValue().getResult().getExpiration().after(now) ) { |
| toRemove.add(entry.getKey()); |
| } |
| } |
| |
| for ( final String id : toRemove ) { |
| provQuerySubmissionMap.remove(id); |
| } |
| |
| if ( provQuerySubmissionMap.size() > MAX_CONCURRENT_QUERIES ) { |
| throw new IllegalStateException("There are already " + MAX_CONCURRENT_QUERIES + " outstanding queries for this Provenance Repository; cannot perform any more queries until the existing queries are expired or canceled"); |
| } |
| } |
| |
| final Iterator<StoredProvenanceEvent> eventItr = ringBuffer.asList().iterator(); |
| final ProvenanceResultSet rs = ProvenanceQuery.compile(query, getSearchableFields(), getSearchableAttributes()).evaluate(eventItr); |
| |
| final Date submissionTime = new Date(); |
| final String queryId = UUID.randomUUID().toString(); |
| final Date expiration = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toNanos(10)); |
| final ProvenanceQuerySubmission submission = new ProvenanceQuerySubmission() { |
| private final AtomicBoolean canceled = new AtomicBoolean(false); |
| |
| @Override |
| public String getQuery() { |
| return query; |
| } |
| |
| @Override |
| public ProvenanceQueryResult getResult() { |
| return new ProvenanceQueryResult() { |
| @Override |
| public ProvenanceResultSet getResultSet() { |
| return rs; |
| } |
| |
| @Override |
| public Date getExpiration() { |
| return expiration; |
| } |
| |
| @Override |
| public String getError() { |
| return null; |
| } |
| |
| @Override |
| public int getPercentComplete() { |
| return 100; |
| } |
| |
| @Override |
| public boolean isFinished() { |
| return true; |
| } |
| }; |
| } |
| |
| @Override |
| public Date getSubmissionTime() { |
| return submissionTime; |
| } |
| |
| @Override |
| public String getQueryIdentifier() { |
| return queryId; |
| } |
| |
| @Override |
| public void cancel() { |
| canceled.set(true); |
| provQuerySubmissionMap.remove(queryId); |
| } |
| |
| @Override |
| public boolean isCanceled() { |
| return canceled.get(); |
| } |
| }; |
| |
| provQuerySubmissionMap.putIfAbsent(queryId, submission); |
| return submission; |
| */ |
| } |
| |
| |
| @Override |
| public ProvenanceQuerySubmission retrieveProvenanceQuerySubmission(final String queryIdentifier) { |
| throw new UnsupportedOperationException(); |
| /* |
| return provQuerySubmissionMap.get(queryIdentifier); |
| */ |
| } |
| } |