blob: bd2c2ad2f2a1060ed3d8748db30af51b5a9f89ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.Relationship;
/**
* Holder for provenance relevant information
* <p/>
* @author none
*/
public final class StandardProvenanceEventRecord implements ProvenanceEventRecord {
private final long eventTime;
private final long entryDate;
private final ProvenanceEventType eventType;
private final long lineageStartDate;
private final Set<String> lineageIdentifiers;
private final String componentId;
private final String componentType;
private final String transitUri;
private final String sourceSystemFlowFileIdentifier;
private final String uuid;
private final List<String> parentUuids;
private final List<String> childrenUuids;
private final String alternateIdentifierUri;
private final String details;
private final String relationship;
private final long storageByteOffset;
private final String storageFilename;
private final long eventDuration;
private final String contentClaimSection;
private final String contentClaimContainer;
private final String contentClaimIdentifier;
private final Long contentClaimOffset;
private final long contentSize;
private final String previousClaimSection;
private final String previousClaimContainer;
private final String previousClaimIdentifier;
private final Long previousClaimOffset;
private final Long previousSize;
private final String sourceQueueIdentifier;
private final Map<String, String> previousAttributes;
private final Map<String, String> updatedAttributes;
private volatile long eventId;
private StandardProvenanceEventRecord(final Builder builder) {
this.eventTime = builder.eventTime;
this.entryDate = builder.entryDate;
this.eventType = builder.eventType;
this.componentId = builder.componentId;
this.componentType = builder.componentType;
this.transitUri = builder.transitUri;
this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier;
this.uuid = builder.uuid;
this.parentUuids = builder.parentUuids;
this.childrenUuids = builder.childrenUuids;
this.alternateIdentifierUri = builder.alternateIdentifierUri;
this.details = builder.details;
this.relationship = builder.relationship;
this.storageByteOffset = builder.storageByteOffset;
this.storageFilename = builder.storageFilename;
this.eventDuration = builder.eventDuration;
this.lineageStartDate = builder.lineageStartDate;
this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers);
previousClaimSection = builder.previousClaimSection;
previousClaimContainer = builder.previousClaimContainer;
previousClaimIdentifier = builder.previousClaimIdentifier;
previousClaimOffset = builder.previousClaimOffset;
previousSize = builder.previousSize;
contentClaimSection = builder.contentClaimSection;
contentClaimContainer = builder.contentClaimContainer;
contentClaimIdentifier = builder.contentClaimIdentifier;
contentClaimOffset = builder.contentClaimOffset;
contentSize = builder.contentSize;
previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes);
updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes);
sourceQueueIdentifier = builder.sourceQueueIdentifier;
}
public String getStorageFilename() {
return storageFilename;
}
public long getStorageByteOffset() {
return storageByteOffset;
}
void setEventId(final long eventId) {
this.eventId = eventId;
}
@Override
public long getEventId() {
return eventId;
}
@Override
public long getEventTime() {
return eventTime;
}
@Override
public Set<String> getLineageIdentifiers() {
return lineageIdentifiers;
}
@Override
public long getLineageStartDate() {
return lineageStartDate;
}
@Override
public long getFileSize() {
return contentSize;
}
@Override
public Long getPreviousFileSize() {
return previousSize;
}
@Override
public ProvenanceEventType getEventType() {
return eventType;
}
@Override
public String getAttribute(final String attributeName) {
if ( updatedAttributes.containsKey(attributeName) ) {
return updatedAttributes.get(attributeName);
}
return previousAttributes.get(attributeName);
}
@Override
public Map<String, String> getAttributes() {
final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size());
allAttrs.putAll(previousAttributes);
for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) {
if (entry.getValue() != null) {
allAttrs.put(entry.getKey(), entry.getValue());
}
}
return allAttrs;
}
@Override
public String getComponentId() {
return componentId;
}
@Override
public String getComponentType() {
return componentType;
}
@Override
public String getTransitUri() {
return transitUri;
}
@Override
public String getSourceSystemFlowFileIdentifier() {
return sourceSystemFlowFileIdentifier;
}
@Override
public String getFlowFileUuid() {
return uuid;
}
@Override
public List<String> getParentUuids() {
return parentUuids == null ? Collections.<String>emptyList() : parentUuids;
}
@Override
public List<String> getChildUuids() {
return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids;
}
@Override
public String getAlternateIdentifierUri() {
return alternateIdentifierUri;
}
@Override
public long getEventDuration() {
return eventDuration;
}
@Override
public String getDetails() {
return details;
}
@Override
public String getRelationship() {
return relationship;
}
@Override
public long getFlowFileEntryDate() {
return entryDate;
}
@Override
public String getContentClaimSection() {
return contentClaimSection;
}
@Override
public String getContentClaimContainer() {
return contentClaimContainer;
}
@Override
public String getContentClaimIdentifier() {
return contentClaimIdentifier;
}
@Override
public Long getContentClaimOffset() {
return contentClaimOffset;
}
@Override
public String getSourceQueueIdentifier() {
return sourceQueueIdentifier;
}
@Override
public Map<String, String> getPreviousAttributes() {
return previousAttributes;
}
@Override
public String getPreviousContentClaimContainer() {
return previousClaimContainer;
}
@Override
public String getPreviousContentClaimIdentifier() {
return previousClaimIdentifier;
}
@Override
public Long getPreviousContentClaimOffset() {
return previousClaimOffset;
}
@Override
public String getPreviousContentClaimSection() {
return previousClaimSection;
}
@Override
public Map<String, String> getUpdatedAttributes() {
return updatedAttributes;
}
@Override
public int hashCode() {
final int eventTypeCode;
if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) {
eventTypeCode = 1472;
} else if (eventType == ProvenanceEventType.REPLAY) {
eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time.
} else {
eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode();
}
return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode())
+ (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode;
}
@Override
public boolean equals(final Object obj) {
if (obj == null) {
return false;
}
if (obj == this) {
return true;
}
if (!(obj instanceof StandardProvenanceEventRecord)) {
return false;
}
final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj;
// If event ID's are populated and not equal, return false. If they have not yet been populated, do not
// use them in the comparison.
if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) {
return false;
}
if (eventType != other.eventType) {
return false;
}
if (!componentId.equals(other.componentId)) {
return false;
}
if (different(parentUuids, other.parentUuids)) {
return false;
}
if (different(childrenUuids, other.childrenUuids)) {
return false;
}
// SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child.
if (!uuid.equals(other.uuid)) {
return false;
}
if (different(transitUri, other.transitUri)) {
return false;
}
if (different(relationship, other.relationship)) {
return false;
}
return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime());
}
private boolean different(final Object a, final Object b) {
if (a == null && b == null) {
return false;
}
if (a == null || b == null) {
return true;
}
return !a.equals(b);
}
private boolean different(final List<String> a, final List<String> b) {
if (a == null && b == null) {
return false;
}
if (a == null && b != null) {
return true;
}
if (a != null && b == null) {
return true;
}
if (a.size() != b.size()) {
return true;
}
final List<String> sortedA = new ArrayList<>(a);
final List<String> sortedB = new ArrayList<>(b);
Collections.sort(sortedA);
Collections.sort(sortedB);
for (int i = 0; i < sortedA.size(); i++) {
if (!sortedA.get(i).equals(sortedB.get(i))) {
return true;
}
}
return false;
}
@Override
public String toString() {
return "ProvenanceEventRecord ["
+ "eventId=" + eventId
+ ", eventType=" + eventType
+ ", eventTime=" + new Date(eventTime)
+ ", uuid=" + uuid
+ ", fileSize=" + contentSize
+ ", componentId=" + componentId
+ ", transitUri=" + transitUri
+ ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier
+ ", parentUuids=" + parentUuids
+ ", alternateIdentifierUri=" + alternateIdentifierUri + "]";
}
public static class Builder implements ProvenanceEventBuilder {
private long eventTime = System.currentTimeMillis();
private long entryDate;
private long lineageStartDate;
private Set<String> lineageIdentifiers = new HashSet<>();
private ProvenanceEventType eventType = null;
private String componentId = null;
private String componentType = null;
private String sourceSystemFlowFileIdentifier = null;
private String transitUri = null;
private String uuid = null;
private List<String> parentUuids = null;
private List<String> childrenUuids = null;
private String alternateIdentifierUri = null;
private String details = null;
private String relationship = null;
private long storageByteOffset = -1L;
private long eventDuration = -1L;
private String storageFilename;
private String contentClaimSection;
private String contentClaimContainer;
private String contentClaimIdentifier;
private Long contentClaimOffset;
private Long contentSize;
private String previousClaimSection;
private String previousClaimContainer;
private String previousClaimIdentifier;
private Long previousClaimOffset;
private Long previousSize;
private String sourceQueueIdentifier;
private Map<String, String> previousAttributes;
private Map<String, String> updatedAttributes;
@Override
public Builder fromEvent(final ProvenanceEventRecord event) {
eventTime = event.getEventTime();
entryDate = event.getFlowFileEntryDate();
lineageStartDate = event.getLineageStartDate();
lineageIdentifiers = event.getLineageIdentifiers();
eventType = event.getEventType();
componentId = event.getComponentId();
componentType = event.getComponentType();
transitUri = event.getTransitUri();
sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier();
uuid = event.getFlowFileUuid();
parentUuids = event.getParentUuids();
childrenUuids = event.getChildUuids();
alternateIdentifierUri = event.getAlternateIdentifierUri();
eventDuration = event.getEventDuration();
previousAttributes = event.getPreviousAttributes();
updatedAttributes = event.getUpdatedAttributes();
details = event.getDetails();
relationship = event.getRelationship();
contentClaimSection = event.getContentClaimSection();
contentClaimContainer = event.getContentClaimContainer();
contentClaimIdentifier = event.getContentClaimIdentifier();
contentClaimOffset = event.getContentClaimOffset();
contentSize = event.getFileSize();
previousClaimSection = event.getPreviousContentClaimSection();
previousClaimContainer = event.getPreviousContentClaimContainer();
previousClaimIdentifier = event.getPreviousContentClaimIdentifier();
previousClaimOffset = event.getPreviousContentClaimOffset();
previousSize = event.getPreviousFileSize();
sourceQueueIdentifier = event.getSourceQueueIdentifier();
if (event instanceof StandardProvenanceEventRecord) {
final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event;
storageByteOffset = standardProvEvent.storageByteOffset;
storageFilename = standardProvEvent.storageFilename;
}
return this;
}
@Override
public Builder setFlowFileEntryDate(final long entryDate) {
this.entryDate = entryDate;
return this;
}
@Override
public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) {
this.lineageIdentifiers = lineageIdentifiers;
return this;
}
@Override
public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) {
this.previousAttributes = previousAttributes;
this.updatedAttributes = updatedAttributes;
return this;
}
@Override
public Builder setFlowFileUUID(final String uuid) {
this.uuid = uuid;
return this;
}
public Builder setStorageLocation(final String filename, final long offset) {
this.storageFilename = filename;
this.storageByteOffset = offset;
return this;
}
@Override
public Builder setEventTime(long eventTime) {
this.eventTime = eventTime;
return this;
}
@Override
public Builder setEventDuration(final long millis) {
this.eventDuration = millis;
return this;
}
@Override
public Builder setLineageStartDate(final long startDate) {
this.lineageStartDate = startDate;
return this;
}
public Builder addLineageIdentifier(final String lineageIdentifier) {
this.lineageIdentifiers.add(lineageIdentifier);
return this;
}
@Override
public Builder setEventType(ProvenanceEventType eventType) {
this.eventType = eventType;
return this;
}
@Override
public Builder setComponentId(String componentId) {
this.componentId = componentId;
return this;
}
@Override
public Builder setComponentType(String componentType) {
this.componentType = componentType;
return this;
}
@Override
public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) {
this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier;
return this;
}
@Override
public Builder setTransitUri(String transitUri) {
this.transitUri = transitUri;
return this;
}
@Override
public Builder addParentFlowFile(final FlowFile parentFlowFile) {
if (this.parentUuids == null) {
this.parentUuids = new ArrayList<>();
}
this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
return this;
}
public Builder addParentUuid(final String uuid) {
if (this.parentUuids == null) {
this.parentUuids = new ArrayList<>();
}
this.parentUuids.add(uuid);
return this;
}
@Override
public Builder removeParentFlowFile(final FlowFile parentFlowFile) {
if (this.parentUuids == null) {
return this;
}
parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key()));
return this;
}
@Override
public Builder addChildFlowFile(final FlowFile childFlowFile) {
if (this.childrenUuids == null) {
this.childrenUuids = new ArrayList<>();
}
this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
return this;
}
public Builder addChildUuid(final String uuid) {
if (this.childrenUuids == null) {
this.childrenUuids = new ArrayList<>();
}
this.childrenUuids.add(uuid);
return this;
}
@Override
public Builder removeChildFlowFile(final FlowFile childFlowFile) {
if (this.childrenUuids == null) {
return this;
}
childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key()));
return this;
}
@Override
public Builder setAlternateIdentifierUri(String alternateIdentifierUri) {
this.alternateIdentifierUri = alternateIdentifierUri;
return this;
}
@Override
public Builder setDetails(String details) {
this.details = details;
return this;
}
@Override
public Builder setRelationship(Relationship relationship) {
this.relationship = relationship.getName();
return this;
}
public Builder setRelationship(final String relationship) {
this.relationship = relationship;
return this;
}
@Override
public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) {
setFlowFileEntryDate(flowFile.getEntryDate());
setLineageIdentifiers(flowFile.getLineageIdentifiers());
setLineageStartDate(flowFile.getLineageStartDate());
setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes());
uuid = flowFile.getAttribute(CoreAttributes.UUID.key());
this.contentSize = flowFile.getSize();
return this;
}
@Override
public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
previousClaimSection = section;
previousClaimContainer = container;
previousClaimIdentifier = identifier;
previousClaimOffset = offset;
previousSize = size;
return this;
}
@Override
public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) {
contentClaimSection = section;
contentClaimContainer = container;
contentClaimIdentifier = identifier;
contentClaimOffset = offset;
contentSize = size;
return this;
}
@Override
public Builder setSourceQueueIdentifier(final String identifier) {
sourceQueueIdentifier = identifier;
return this;
}
private void assertSet(final Object value, final String name) {
if (value == null) {
throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set");
}
}
public ProvenanceEventType getEventType() {
return eventType;
}
public List<String> getChildUuids() {
return Collections.unmodifiableList(childrenUuids);
}
public List<String> getParentUuids() {
return Collections.unmodifiableList(parentUuids);
}
@Override
public StandardProvenanceEventRecord build() {
assertSet(eventType, "Event Type");
assertSet(componentId, "Component ID");
assertSet(componentType, "Component Type");
assertSet(uuid, "FlowFile UUID");
assertSet(contentSize, "FlowFile Size");
switch (eventType) {
case ADDINFO:
if (alternateIdentifierUri == null) {
throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set");
}
break;
case RECEIVE:
case SEND:
assertSet(transitUri, "Transit URI");
break;
case ROUTE:
assertSet(relationship, "Relationship");
break;
case CLONE:
case FORK:
case JOIN:
if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) {
throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set");
}
break;
default:
break;
}
return new StandardProvenanceEventRecord(this);
}
}
}