blob: fae427ed8e337f3bbd01e421d02fedc5e9b8658e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.provenance.journaling.io;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
public class StandardEventSerializer implements Serializer {
public static final String CODEC_NAME = "StandardProvCodec";
@Override
public int getVersion() {
return 1;
}
@Override
public String getCodecName() {
return CODEC_NAME;
}
@Override
public void serialize(final ProvenanceEventRecord event, final DataOutputStream out) throws IOException {
final ProvenanceEventType recordType = event.getEventType();
out.writeUTF(event.getEventType().name());
out.writeLong(event.getEventTime());
out.writeLong(event.getFlowFileEntryDate());
out.writeLong(event.getEventDuration());
writeUUIDs(out, event.getLineageIdentifiers());
out.writeLong(event.getLineageStartDate());
writeNullableString(out, event.getComponentId());
writeNullableString(out, event.getComponentType());
writeUUID(out, event.getFlowFileUuid());
writeNullableString(out, event.getDetails());
// Write FlowFile attributes
final Map<String, String> attrs = event.getPreviousAttributes();
out.writeInt(attrs.size());
for (final Map.Entry<String, String> entry : attrs.entrySet()) {
writeLongString(out, entry.getKey());
writeLongString(out, entry.getValue());
}
final Map<String, String> attrUpdates = event.getUpdatedAttributes();
out.writeInt(attrUpdates.size());
for (final Map.Entry<String, String> entry : attrUpdates.entrySet()) {
writeLongString(out, entry.getKey());
writeLongNullableString(out, entry.getValue());
}
// If Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (event.getContentClaimSection() != null && event.getContentClaimContainer() != null && event.getContentClaimIdentifier() != null) {
out.writeBoolean(true);
out.writeUTF(event.getContentClaimContainer());
out.writeUTF(event.getContentClaimSection());
out.writeUTF(event.getContentClaimIdentifier());
if (event.getContentClaimOffset() == null) {
out.writeLong(0L);
} else {
out.writeLong(event.getContentClaimOffset());
}
out.writeLong(event.getFileSize());
} else {
out.writeBoolean(false);
}
// If Previous Content Claim Info is present, write out a 'TRUE' followed by claim info. Else, write out 'false'.
if (event.getPreviousContentClaimSection() != null && event.getPreviousContentClaimContainer() != null && event.getPreviousContentClaimIdentifier() != null) {
out.writeBoolean(true);
out.writeUTF(event.getPreviousContentClaimContainer());
out.writeUTF(event.getPreviousContentClaimSection());
out.writeUTF(event.getPreviousContentClaimIdentifier());
if (event.getPreviousContentClaimOffset() == null) {
out.writeLong(0L);
} else {
out.writeLong(event.getPreviousContentClaimOffset());
}
if (event.getPreviousFileSize() == null) {
out.writeLong(0L);
} else {
out.writeLong(event.getPreviousFileSize());
}
} else {
out.writeBoolean(false);
}
// write out the identifier of the destination queue.
writeNullableString(out, event.getSourceQueueIdentifier());
// Write type-specific info
if (recordType == ProvenanceEventType.FORK || recordType == ProvenanceEventType.JOIN || recordType == ProvenanceEventType.CLONE || recordType == ProvenanceEventType.REPLAY) {
writeUUIDs(out, event.getParentUuids());
writeUUIDs(out, event.getChildUuids());
} else if (recordType == ProvenanceEventType.RECEIVE) {
writeNullableString(out, event.getTransitUri());
writeNullableString(out, event.getSourceSystemFlowFileIdentifier());
} else if (recordType == ProvenanceEventType.SEND) {
writeNullableString(out, event.getTransitUri());
} else if (recordType == ProvenanceEventType.ADDINFO) {
writeNullableString(out, event.getAlternateIdentifierUri());
} else if (recordType == ProvenanceEventType.ROUTE) {
writeNullableString(out, event.getRelationship());
}
}
private void writeNullableString(final DataOutputStream out, final String toWrite) throws IOException {
if (toWrite == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeUTF(toWrite);
}
}
private void writeLongNullableString(final DataOutputStream out, final String toWrite) throws IOException {
if (toWrite == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
writeLongString(out, toWrite);
}
}
private void writeLongString(final DataOutputStream out, final String value) throws IOException {
final byte[] bytes = value.getBytes("UTF-8");
out.writeInt(bytes.length);
out.write(bytes);
}
static void writeUUID(final DataOutputStream out, final String uuid) throws IOException {
final UUID uuidObj = UUID.fromString(uuid);
out.writeLong(uuidObj.getMostSignificantBits());
out.writeLong(uuidObj.getLeastSignificantBits());
}
static void writeUUIDs(final DataOutputStream out, final Collection<String> list) throws IOException {
if (list == null) {
out.writeInt(0);
} else {
out.writeInt(list.size());
for (final String value : list) {
writeUUID(out, value);
}
}
}
}