blob: 1abd5c9ca3c6216fefd1ebdfe306b1b3936f838e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexableField;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.SearchableFields;
import org.apache.nifi.provenance.StorageLocation;
import org.apache.nifi.provenance.StoredProvenanceEvent;
import org.apache.nifi.provenance.journaling.exception.EventNotFoundException;
import org.apache.nifi.provenance.journaling.index.IndexedFieldNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LazyInitializedProvenanceEvent implements StoredProvenanceEvent {
private static final Logger logger = LoggerFactory.getLogger(LazyInitializedProvenanceEvent.class);
private final ProvenanceEventRepository repo;
private final StorageLocation storageLocation;
private final Document doc;
private ProvenanceEventRecord fullRecord;
public LazyInitializedProvenanceEvent(final ProvenanceEventRepository repo, final StorageLocation storageLocation, final Document document) {
this.repo = repo;
this.storageLocation = storageLocation;
this.doc = document;
}
@Override
public long getEventId() {
return doc.getField(IndexedFieldNames.EVENT_ID).numericValue().longValue();
}
@Override
public StorageLocation getStorageLocation() {
return storageLocation;
}
@Override
public long getEventTime() {
return doc.getField(SearchableFields.EventTime.getSearchableFieldName()).numericValue().longValue();
}
private void ensureFullyLoaded() {
if ( fullRecord != null ) {
return;
}
final long id = getEventId();
try {
fullRecord = repo.getEvent(id);
} catch (final IOException ioe) {
final String containerName = doc.get(IndexedFieldNames.CONTAINER_NAME);
final String sectionName = doc.get(IndexedFieldNames.SECTION_NAME);
final String journalId = doc.get(IndexedFieldNames.JOURNAL_ID);
final String error = "Failed to load event with ID " + id + " from container '" + containerName + "', section '" + sectionName + "', journal '" + journalId + "' due to " + ioe;
logger.error(error);
if ( logger.isDebugEnabled() ) {
logger.error("", ioe);
}
throw new EventNotFoundException(error);
}
}
@Override
public long getFlowFileEntryDate() {
ensureFullyLoaded();
return fullRecord.getFlowFileEntryDate();
}
@Override
public long getLineageStartDate() {
final IndexableField field = doc.getField(SearchableFields.LineageStartDate.getSearchableFieldName());
if ( field != null ) {
return field.numericValue().longValue();
}
ensureFullyLoaded();
return fullRecord.getLineageStartDate();
}
@Override
public Set<String> getLineageIdentifiers() {
ensureFullyLoaded();
return fullRecord.getLineageIdentifiers();
}
@Override
public long getFileSize() {
final IndexableField field = doc.getField(SearchableFields.FileSize.getSearchableFieldName());
if ( field != null ) {
return field.numericValue().longValue();
}
ensureFullyLoaded();
return fullRecord.getFileSize();
}
@Override
public Long getPreviousFileSize() {
ensureFullyLoaded();
return fullRecord.getPreviousFileSize();
}
@Override
public long getEventDuration() {
// TODO: Allow Event Duration to be indexed; it could be interesting for reporting.
ensureFullyLoaded();
return fullRecord.getEventDuration();
}
@Override
public ProvenanceEventType getEventType() {
final String name = doc.get(SearchableFields.EventType.getSearchableFieldName());
return ProvenanceEventType.valueOf(name.toUpperCase());
}
@Override
public Map<String, String> getAttributes() {
ensureFullyLoaded();
return fullRecord.getAttributes();
}
@Override
public String getAttribute(final String attributeName) {
final String attr = doc.get(attributeName);
if ( attr == null ) {
ensureFullyLoaded();
return fullRecord.getAttribute(attributeName);
} else {
return attr;
}
}
@Override
public Map<String, String> getPreviousAttributes() {
ensureFullyLoaded();
return fullRecord.getPreviousAttributes();
}
@Override
public Map<String, String> getUpdatedAttributes() {
ensureFullyLoaded();
return fullRecord.getUpdatedAttributes();
}
@Override
public String getComponentId() {
final String componentId = doc.get(SearchableFields.ComponentID.getSearchableFieldName());
if ( componentId == null ) {
ensureFullyLoaded();
return fullRecord.getComponentId();
} else {
return componentId;
}
}
@Override
public String getComponentType() {
// TODO: Make indexable.
ensureFullyLoaded();
return fullRecord.getComponentType();
}
@Override
public String getTransitUri() {
final String transitUri = doc.get(SearchableFields.TransitURI.getSearchableFieldName());
if ( transitUri == null ) {
final ProvenanceEventType eventType = getEventType();
switch (eventType) {
case RECEIVE:
case SEND:
ensureFullyLoaded();
return fullRecord.getTransitUri();
default:
return null;
}
} else {
return transitUri;
}
}
@Override
public String getSourceSystemFlowFileIdentifier() {
ensureFullyLoaded();
return fullRecord.getSourceSystemFlowFileIdentifier();
}
@Override
public String getFlowFileUuid() {
String uuid = doc.get(SearchableFields.FlowFileUUID.getSearchableFieldName());
if ( uuid != null ) {
return uuid;
}
ensureFullyLoaded();
return fullRecord.getFlowFileUuid();
}
@Override
public List<String> getParentUuids() {
final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
if ( uuids == null || uuids.length < 2 ) {
return Collections.emptyList();
}
switch (getEventType()) {
case JOIN: {
final List<String> parentUuids = new ArrayList<>(uuids.length - 1);
for (int i=1; i < uuids.length; i++) {
parentUuids.add(uuids[i].stringValue());
}
return parentUuids;
}
default:
return Collections.emptyList();
}
}
@Override
public List<String> getChildUuids() {
final IndexableField[] uuids = doc.getFields(SearchableFields.FlowFileUUID.getSearchableFieldName());
if ( uuids == null || uuids.length < 2 ) {
return Collections.emptyList();
}
switch (getEventType()) {
case REPLAY:
case CLONE:
case FORK: {
final List<String> childUuids = new ArrayList<>(uuids.length - 1);
for (int i=1; i < uuids.length; i++) {
childUuids.add(uuids[i].stringValue());
}
return childUuids;
}
default:
return Collections.emptyList();
}
}
@Override
public String getAlternateIdentifierUri() {
final String altId = doc.get(SearchableFields.AlternateIdentifierURI.getSearchableFieldName());
if ( altId == null && getEventType() == ProvenanceEventType.ADDINFO ) {
ensureFullyLoaded();
return fullRecord.getAlternateIdentifierUri();
} else {
return null;
}
}
@Override
public String getDetails() {
final String details = doc.get(SearchableFields.Details.getSearchableFieldName());
if ( details == null ) {
ensureFullyLoaded();
return fullRecord.getDetails();
}
return null;
}
@Override
public String getRelationship() {
final String relationship = doc.get(SearchableFields.Relationship.getSearchableFieldName());
if ( relationship == null ) {
ensureFullyLoaded();
return fullRecord.getRelationship();
}
return null;
}
@Override
public String getSourceQueueIdentifier() {
final String queueId = doc.get(SearchableFields.SourceQueueIdentifier.getSearchableFieldName());
if ( queueId == null ) {
ensureFullyLoaded();
return fullRecord.getSourceQueueIdentifier();
}
return null;
}
@Override
public String getContentClaimSection() {
final String claimSection = doc.get(SearchableFields.ContentClaimSection.getSearchableFieldName());
if ( claimSection == null ) {
ensureFullyLoaded();
return fullRecord.getContentClaimSection();
}
return null;
}
@Override
public String getPreviousContentClaimSection() {
ensureFullyLoaded();
return fullRecord.getPreviousContentClaimSection();
}
@Override
public String getContentClaimContainer() {
final String claimContainer = doc.get(SearchableFields.ContentClaimContainer.getSearchableFieldName());
if ( claimContainer == null ) {
ensureFullyLoaded();
return fullRecord.getContentClaimContainer();
}
return null;
}
@Override
public String getPreviousContentClaimContainer() {
ensureFullyLoaded();
return fullRecord.getPreviousContentClaimContainer();
}
@Override
public String getContentClaimIdentifier() {
final String claimIdentifier = doc.get(SearchableFields.ContentClaimIdentifier.getSearchableFieldName());
if ( claimIdentifier == null ) {
ensureFullyLoaded();
return fullRecord.getContentClaimIdentifier();
}
return null;
}
@Override
public String getPreviousContentClaimIdentifier() {
ensureFullyLoaded();
return fullRecord.getPreviousContentClaimIdentifier();
}
@Override
public Long getContentClaimOffset() {
final String claimOffset = doc.get(SearchableFields.ContentClaimOffset.getSearchableFieldName());
if ( claimOffset == null ) {
ensureFullyLoaded();
return fullRecord.getContentClaimOffset();
}
return null;
}
@Override
public Long getPreviousContentClaimOffset() {
ensureFullyLoaded();
return fullRecord.getPreviousContentClaimOffset();
}
}