blob: fb537ee0dc6fcd1bc8b6ad57caf6847efdbec541 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling.io;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.journaling.IdEnrichedProvenanceEvent;
import org.apache.nifi.stream.io.StreamUtils;
public class StandardEventDeserializer implements Deserializer {
public static final String CODEC_NAME = StandardEventSerializer.CODEC_NAME;
@Override
public String getCodecName() {
return CODEC_NAME;
}
@Override
public ProvenanceEventRecord deserialize(final DataInputStream in, final int serializationVersion) throws IOException {
final StandardProvenanceEventRecord.Builder builder = new StandardProvenanceEventRecord.Builder();
final long eventId = in.readLong();
final String eventTypeName = in.readUTF();
final ProvenanceEventType eventType = ProvenanceEventType.valueOf(eventTypeName);
builder.setEventType(eventType);
builder.setEventTime(in.readLong());
final Long flowFileEntryDate = in.readLong();
builder.setEventDuration(in.readLong());
final Set<String> lineageIdentifiers = new HashSet<>();
final int numLineageIdentifiers = in.readInt();
for (int i = 0; i < numLineageIdentifiers; i++) {
lineageIdentifiers.add(readUUID(in));
}
final long lineageStartDate = in.readLong();
builder.setComponentId(readNullableString(in));
builder.setComponentType(readNullableString(in));
final String uuid = readUUID(in);
builder.setFlowFileUUID(uuid);
builder.setDetails(readNullableString(in));
// Read in the FlowFile Attributes
final Map<String, String> previousAttrs = readAttributes(in, false);
final Map<String, String> attrUpdates = readAttributes(in, true);
builder.setAttributes(previousAttrs, attrUpdates);
final boolean hasContentClaim = in.readBoolean();
if (hasContentClaim) {
builder.setCurrentContentClaim(in.readUTF(), in.readUTF(), in.readUTF(), in.readLong(), in.readLong());
} else {
builder.setCurrentContentClaim(null, null, null, null, 0L);
}
final boolean hasPreviousClaim = in.readBoolean();
if (hasPreviousClaim) {
builder.setPreviousContentClaim(in.readUTF(), in.readUTF(), in.readUTF(), in.readLong(), in.readLong());
}
builder.setSourceQueueIdentifier(readNullableString(in));
// Read Event-Type specific fields.
if (eventType == ProvenanceEventType.FORK || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.REPLAY) {
final int numParents = in.readInt();
for (int i = 0; i < numParents; i++) {
builder.addParentUuid(readUUID(in));
}
final int numChildren = in.readInt();
for (int i = 0; i < numChildren; i++) {
builder.addChildUuid(readUUID(in));
}
} else if (eventType == ProvenanceEventType.RECEIVE) {
builder.setTransitUri(readNullableString(in));
builder.setSourceSystemFlowFileIdentifier(readNullableString(in));
} else if (eventType == ProvenanceEventType.SEND) {
builder.setTransitUri(readNullableString(in));
} else if (eventType == ProvenanceEventType.ADDINFO) {
builder.setAlternateIdentifierUri(readNullableString(in));
} else if (eventType == ProvenanceEventType.ROUTE) {
builder.setRelationship(readNullableString(in));
}
builder.setFlowFileEntryDate(flowFileEntryDate);
builder.setLineageIdentifiers(lineageIdentifiers);
builder.setLineageStartDate(lineageStartDate);
final ProvenanceEventRecord event = builder.build();
return new IdEnrichedProvenanceEvent(event, eventId);
}
private static Map<String, String> readAttributes(final DataInputStream dis, final boolean valueNullable) throws IOException {
final int numAttributes = dis.readInt();
final Map<String, String> attrs = new HashMap<>();
for (int i = 0; i < numAttributes; i++) {
final String key = readLongString(dis);
final String value = valueNullable ? readLongNullableString(dis) : readLongString(dis);
attrs.put(key, value);
}
return attrs;
}
private static String readUUID(final DataInputStream in) throws IOException {
final long msb = in.readLong();
final long lsb = in.readLong();
return new UUID(msb, lsb).toString();
}
private static String readNullableString(final DataInputStream in) throws IOException {
final boolean valueExists = in.readBoolean();
if (valueExists) {
return in.readUTF();
} else {
return null;
}
}
private static String readLongNullableString(final DataInputStream in) throws IOException {
final boolean valueExists = in.readBoolean();
if (valueExists) {
return readLongString(in);
} else {
return null;
}
}
private static String readLongString(final DataInputStream in) throws IOException {
final int length = in.readInt();
final byte[] strBytes = new byte[length];
StreamUtils.fillBuffer(in, strBytes);
return new String(strBytes, StandardCharsets.UTF_8);
}
}