blob: 4f8914c3aaaba94c27f76df010ed9dfb2ff2815d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.repository;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.provenance.AsyncLineageSubmission;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.lineage.ComputeLineageSubmission;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.search.QuerySubmission;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.util.RingBuffer;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
public class StatelessProvenanceRepository implements ProvenanceRepository {
public static String CONTAINER_NAME = "in-memory";
private final RingBuffer<ProvenanceEventRecord> ringBuffer;
private final int maxSize;
private final AtomicLong idGenerator = new AtomicLong(0L);
public StatelessProvenanceRepository(final int maxEvents) {
maxSize = maxEvents;
ringBuffer = new RingBuffer<>(maxSize);
}
@Override
public void initialize(final EventReporter eventReporter, final Authorizer authorizer, final ProvenanceAuthorizableFactory resourceFactory,
final IdentifierLookup idLookup) throws IOException {
}
@Override
public ProvenanceEventRepository getProvenanceEventRepository() {
return this;
}
@Override
public ProvenanceEventBuilder eventBuilder() {
return new StandardProvenanceEventRecord.Builder();
}
@Override
public void registerEvent(final ProvenanceEventRecord event) {
final long id = idGenerator.getAndIncrement();
ringBuffer.add(new IdEnrichedProvEvent(event, id));
}
@Override
public void registerEvents(final Iterable<ProvenanceEventRecord> events) {
for (final ProvenanceEventRecord event : events) {
registerEvent(event);
}
}
@Override
public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords) throws IOException {
return getEvents(firstRecordId, maxRecords, null);
}
@Override
public List<ProvenanceEventRecord> getEvents(final long firstRecordId, final int maxRecords, final NiFiUser user) throws IOException {
return ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
@Override
public boolean select(final ProvenanceEventRecord value) {
return value.getEventId() >= firstRecordId;
}
}, maxRecords);
}
@Override
public Long getMaxEventId() {
final ProvenanceEventRecord newest = ringBuffer.getNewestElement();
return (newest == null) ? null : newest.getEventId();
}
public ProvenanceEventRecord getEvent(final String identifier) throws IOException {
final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
@Override
public boolean select(final ProvenanceEventRecord event) {
return identifier.equals(event.getFlowFileUuid());
}
}, 1);
return records.isEmpty() ? null : records.get(0);
}
@Override
public ProvenanceEventRecord getEvent(final long id) {
final List<ProvenanceEventRecord> records = ringBuffer.getSelectedElements(new RingBuffer.Filter<ProvenanceEventRecord>() {
@Override
public boolean select(final ProvenanceEventRecord event) {
return event.getEventId() == id;
}
}, 1);
return records.isEmpty() ? null : records.get(0);
}
@Override
public ProvenanceEventRecord getEvent(final long id, final NiFiUser user) {
return getEvent(id);
}
@Override
public void close() throws IOException {
}
@Override
public List<SearchableField> getSearchableFields() {
throw new UnsupportedOperationException();
}
@Override
public List<SearchableField> getSearchableAttributes() {
throw new UnsupportedOperationException();
}
@Override
public QuerySubmission submitQuery(final Query query, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public QuerySubmission retrieveQuerySubmission(final String queryIdentifier, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public ComputeLineageSubmission submitLineageComputation(final long eventId, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public AsyncLineageSubmission submitLineageComputation(final String flowFileUuid, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public ComputeLineageSubmission retrieveLineageSubmission(String lineageIdentifier, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public ComputeLineageSubmission submitExpandParents(final long eventId, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public ComputeLineageSubmission submitExpandChildren(final long eventId, final NiFiUser user) {
throw new UnsupportedOperationException();
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
return maxSize;
}
@Override
public Set<String> getContainerNames() {
return Collections.singleton(CONTAINER_NAME);
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
return maxSize - ringBuffer.getSize();
}
@Override
public String getContainerFileStoreName(String containerName) {
return null;
}
private static class IdEnrichedProvEvent implements ProvenanceEventRecord {
private final ProvenanceEventRecord record;
private final long id;
public IdEnrichedProvEvent(final ProvenanceEventRecord record, final long id) {
this.record = record;
this.id = id;
}
@Override
public long getEventId() {
return id;
}
@Override
public long getEventTime() {
return record.getEventTime();
}
@Override
public long getFlowFileEntryDate() {
return record.getFlowFileEntryDate();
}
@Override
public long getLineageStartDate() {
return record.getLineageStartDate();
}
@Override
public long getFileSize() {
return record.getFileSize();
}
@Override
public Long getPreviousFileSize() {
return record.getPreviousFileSize();
}
@Override
public long getEventDuration() {
return record.getEventDuration();
}
@Override
public ProvenanceEventType getEventType() {
return record.getEventType();
}
@Override
public Map<String, String> getAttributes() {
return record.getAttributes();
}
@Override
public Map<String, String> getPreviousAttributes() {
return record.getPreviousAttributes();
}
@Override
public Map<String, String> getUpdatedAttributes() {
return record.getUpdatedAttributes();
}
@Override
public String getComponentId() {
return record.getComponentId();
}
@Override
public String getComponentType() {
return record.getComponentType();
}
@Override
public String getTransitUri() {
return record.getTransitUri();
}
@Override
public String getSourceSystemFlowFileIdentifier() {
return record.getSourceSystemFlowFileIdentifier();
}
@Override
public String getFlowFileUuid() {
return record.getFlowFileUuid();
}
@Override
public List<String> getParentUuids() {
return record.getParentUuids();
}
@Override
public List<String> getChildUuids() {
return record.getChildUuids();
}
@Override
public String getAlternateIdentifierUri() {
return record.getAlternateIdentifierUri();
}
@Override
public String getDetails() {
return record.getDetails();
}
@Override
public String getRelationship() {
return record.getRelationship();
}
@Override
public String getSourceQueueIdentifier() {
return record.getSourceQueueIdentifier();
}
@Override
public String getContentClaimSection() {
return record.getContentClaimSection();
}
@Override
public String getPreviousContentClaimSection() {
return record.getPreviousContentClaimSection();
}
@Override
public String getContentClaimContainer() {
return record.getContentClaimContainer();
}
@Override
public String getPreviousContentClaimContainer() {
return record.getPreviousContentClaimContainer();
}
@Override
public String getContentClaimIdentifier() {
return record.getContentClaimIdentifier();
}
@Override
public String getPreviousContentClaimIdentifier() {
return record.getPreviousContentClaimIdentifier();
}
@Override
public Long getContentClaimOffset() {
return record.getContentClaimOffset();
}
@Override
public Long getPreviousContentClaimOffset() {
return record.getPreviousContentClaimOffset();
}
/**
* Returns the best event identifier for this event (eventId if available, descriptive identifier if not yet persisted to allow for traceability).
*
* @return a descriptive event ID to allow tracing
*/
@Override
public String getBestEventIdentifier() {
return Long.toString(getEventId());
}
}
}