| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.controller.repository; |
| |
| import org.apache.nifi.controller.repository.claim.ContentClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaimManager; |
| import org.apache.nifi.controller.repository.claim.StandardContentClaim; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.wali.SerDe; |
| import org.wali.UpdateType; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.HashMap; |
| import java.util.Map; |
| |
| public class WriteAheadRepositoryRecordSerde extends RepositoryRecordSerde implements SerDe<SerializedRepositoryRecord> { |
| private static final Logger logger = LoggerFactory.getLogger(WriteAheadRepositoryRecordSerde.class); |
| |
| private static final int CURRENT_ENCODING_VERSION = 9; |
| |
| public static final byte ACTION_CREATE = 0; |
| public static final byte ACTION_UPDATE = 1; |
| public static final byte ACTION_DELETE = 2; |
| public static final byte ACTION_SWAPPED_OUT = 3; |
| public static final byte ACTION_SWAPPED_IN = 4; |
| |
| private long recordsRestored = 0L; |
| private final ResourceClaimManager claimManager; |
| |
| public WriteAheadRepositoryRecordSerde(final ResourceClaimManager claimManager) { |
| this.claimManager = claimManager; |
| } |
| |
| @Override |
| public void serializeEdit(final SerializedRepositoryRecord previousRecordState, final SerializedRepositoryRecord record, final DataOutputStream out) throws IOException { |
| serializeEdit(previousRecordState, record, out, false); |
| } |
| |
| public void serializeEdit(final SerializedRepositoryRecord previousRecordState, final SerializedRepositoryRecord record, final DataOutputStream out, final boolean forceAttributesWritten) |
| throws IOException { |
| if (record.isMarkedForAbort()) { |
| logger.warn("Repository Record {} is marked to be aborted; it will be persisted in the FlowFileRepository as a DELETE record", record); |
| out.write(ACTION_DELETE); |
| out.writeLong(getRecordIdentifier(record)); |
| serializeContentClaim(record.getContentClaim(), record.getClaimOffset(), out); |
| return; |
| } |
| |
| final UpdateType updateType = getUpdateType(record); |
| |
| if (updateType.equals(UpdateType.DELETE)) { |
| out.write(ACTION_DELETE); |
| out.writeLong(getRecordIdentifier(record)); |
| serializeContentClaim(record.getContentClaim(), record.getClaimOffset(), out); |
| return; |
| } |
| |
| // If there's a Destination Connection, that's the one that we want to associated with this record. |
| // However, on restart, we will restore the FlowFile and set this connection to its "originalConnection". |
| // If we then serialize the FlowFile again before it's transferred, it's important to allow this to happen, |
| // so we use the originalConnection instead |
| final String associatedQueueId = record.getQueueIdentifier(); |
| |
| if (updateType.equals(UpdateType.SWAP_OUT)) { |
| out.write(ACTION_SWAPPED_OUT); |
| out.writeLong(getRecordIdentifier(record)); |
| out.writeUTF(associatedQueueId); |
| out.writeUTF(getLocation(record)); |
| return; |
| } |
| |
| final FlowFile flowFile = record.getFlowFileRecord(); |
| final ContentClaim claim = record.getContentClaim(); |
| |
| switch (updateType) { |
| case UPDATE: |
| out.write(ACTION_UPDATE); |
| break; |
| case CREATE: |
| out.write(ACTION_CREATE); |
| break; |
| case SWAP_IN: |
| out.write(ACTION_SWAPPED_IN); |
| break; |
| default: |
| throw new AssertionError(); |
| } |
| |
| out.writeLong(getRecordIdentifier(record)); |
| out.writeLong(flowFile.getEntryDate()); |
| out.writeLong(flowFile.getLineageStartDate()); |
| out.writeLong(flowFile.getLineageStartIndex()); |
| |
| final Long queueDate = flowFile.getLastQueueDate(); |
| out.writeLong(queueDate == null ? System.currentTimeMillis() : queueDate); |
| out.writeLong(flowFile.getQueueDateIndex()); |
| out.writeLong(flowFile.getSize()); |
| |
| if (associatedQueueId == null) { |
| logger.warn("{} Repository Record {} has no Connection associated with it; it will be destroyed on restart", |
| new Object[] {this, record}); |
| writeString("", out); |
| } else { |
| writeString(associatedQueueId, out); |
| } |
| |
| serializeContentClaim(claim, record.getClaimOffset(), out); |
| |
| if (forceAttributesWritten || record.isAttributesChanged() || updateType == UpdateType.CREATE || updateType == UpdateType.SWAP_IN) { |
| out.write(1); // indicate attributes changed |
| final Map<String, String> attributes = flowFile.getAttributes(); |
| out.writeInt(attributes.size()); |
| for (final Map.Entry<String, String> entry : attributes.entrySet()) { |
| writeString(entry.getKey(), out); |
| writeString(entry.getValue(), out); |
| } |
| } else { |
| out.write(0); // indicate attributes did not change |
| } |
| |
| if (updateType == UpdateType.SWAP_IN) { |
| out.writeUTF(record.getSwapLocation()); |
| } |
| } |
| |
| @Override |
| public SerializedRepositoryRecord deserializeEdit(final DataInputStream in, final Map<Object, SerializedRepositoryRecord> currentRecordStates, final int version) throws IOException { |
| final int action = in.read(); |
| final long recordId = in.readLong(); |
| if (action == ACTION_DELETE) { |
| final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); |
| |
| if (version > 4) { |
| deserializeClaim(in, version, ffBuilder); |
| } |
| |
| final FlowFileRecord flowFileRecord = ffBuilder.build(); |
| final SerializedRepositoryRecord record = new ReconstitutedSerializedRepositoryRecord.Builder() |
| .type(RepositoryRecordType.DELETE) |
| .flowFileRecord(flowFileRecord) |
| .build(); |
| |
| return record; |
| } |
| |
| if (action == ACTION_SWAPPED_OUT) { |
| final String queueId = in.readUTF(); |
| final String location = in.readUTF(); |
| |
| final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() |
| .id(recordId) |
| .build(); |
| |
| |
| final SerializedRepositoryRecord record = new ReconstitutedSerializedRepositoryRecord.Builder() |
| .type(RepositoryRecordType.SWAP_OUT) |
| .queueIdentifier(queueId) |
| .swapLocation(location) |
| .flowFileRecord(flowFileRecord) |
| .build(); |
| |
| return record; |
| } |
| |
| final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); |
| final SerializedRepositoryRecord record = currentRecordStates.get(recordId); |
| ffBuilder.id(recordId); |
| if (record != null) { |
| ffBuilder.fromFlowFile(record.getFlowFileRecord()); |
| } |
| ffBuilder.entryDate(in.readLong()); |
| |
| if (version > 1) { |
| // read the lineage identifiers and lineage start date, which were added in version 2. |
| if (version < 9) { |
| final int numLineageIds = in.readInt(); |
| for (int i = 0; i < numLineageIds; i++) { |
| in.readUTF(); //skip identifiers |
| } |
| } |
| final long lineageStartDate = in.readLong(); |
| final long lineageStartIndex; |
| if (version > 7) { |
| lineageStartIndex = in.readLong(); |
| } else { |
| lineageStartIndex = 0L; |
| } |
| ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); |
| |
| if (version > 5) { |
| final long lastQueueDate = in.readLong(); |
| final long queueDateIndex; |
| if (version > 7) { |
| queueDateIndex = in.readLong(); |
| } else { |
| queueDateIndex = 0L; |
| } |
| |
| ffBuilder.lastQueued(lastQueueDate, queueDateIndex); |
| } |
| } |
| |
| ffBuilder.size(in.readLong()); |
| final String connectionId = readString(in); |
| |
| logger.debug("{} -> {}", new Object[] {recordId, connectionId}); |
| |
| deserializeClaim(in, version, ffBuilder); |
| |
| // recover new attributes, if they changed |
| final int attributesChanged = in.read(); |
| if (attributesChanged == -1) { |
| throw new EOFException(); |
| } else if (attributesChanged == 1) { |
| final int numAttributes = in.readInt(); |
| final Map<String, String> attributes = new HashMap<>(); |
| for (int i = 0; i < numAttributes; i++) { |
| final String key = readString(in); |
| final String value = readString(in); |
| attributes.put(key, value); |
| } |
| |
| ffBuilder.addAttributes(attributes); |
| } else if (attributesChanged != 0) { |
| throw new IOException("Attribute Change Qualifier not found in stream; found value: " |
| + attributesChanged + " after successfully restoring " + recordsRestored + " records. The FlowFile Repository appears to be corrupt!"); |
| } |
| |
| final FlowFileRecord flowFile = ffBuilder.build(); |
| String swapLocation = null; |
| if (action == ACTION_SWAPPED_IN) { |
| swapLocation = in.readUTF(); |
| } |
| |
| final RepositoryRecordType recordType = getRecordType(action); |
| |
| final SerializedRepositoryRecord repositoryRecord = new ReconstitutedSerializedRepositoryRecord.Builder() |
| .flowFileRecord(flowFile) |
| .queueIdentifier(connectionId) |
| .swapLocation(swapLocation) |
| .type(recordType) |
| .build(); |
| |
| recordsRestored++; |
| return repositoryRecord; |
| } |
| |
| @Override |
| public SerializedRepositoryRecord deserializeRecord(final DataInputStream in, final int version) throws IOException { |
| final int action = in.read(); |
| if (action == -1) { |
| return null; |
| } |
| |
| final long recordId = in.readLong(); |
| if (action == ACTION_DELETE) { |
| final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder().id(recordId); |
| |
| if (version > 4) { |
| deserializeClaim(in, version, ffBuilder); |
| } |
| |
| final FlowFileRecord flowFileRecord = ffBuilder.build(); |
| final SerializedRepositoryRecord record = new ReconstitutedSerializedRepositoryRecord.Builder() |
| .type(RepositoryRecordType.DELETE) |
| .flowFileRecord(flowFileRecord) |
| .build(); |
| |
| return record; |
| } |
| |
| // if action was not delete, it must be create/swap in |
| final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); |
| final long entryDate = in.readLong(); |
| |
| if (version > 1) { |
| // read the lineage identifiers and lineage start date, which were added in version 2. |
| if (version < 9) { |
| final int numLineageIds = in.readInt(); |
| for (int i = 0; i < numLineageIds; i++) { |
| in.readUTF(); //skip identifiers |
| } |
| } |
| |
| final long lineageStartDate = in.readLong(); |
| final long lineageStartIndex; |
| if (version > 7) { |
| lineageStartIndex = in.readLong(); |
| } else { |
| lineageStartIndex = 0L; |
| } |
| ffBuilder.lineageStart(lineageStartDate, lineageStartIndex); |
| |
| if (version > 5) { |
| final long lastQueueDate = in.readLong(); |
| final long queueDateIndex; |
| if (version > 7) { |
| queueDateIndex = in.readLong(); |
| } else { |
| queueDateIndex = 0L; |
| } |
| |
| ffBuilder.lastQueued(lastQueueDate, queueDateIndex); |
| } |
| } |
| |
| final long size = in.readLong(); |
| final String connectionId = readString(in); |
| |
| logger.debug("{} -> {}", new Object[] {recordId, connectionId}); |
| |
| ffBuilder.id(recordId); |
| ffBuilder.entryDate(entryDate); |
| ffBuilder.size(size); |
| |
| deserializeClaim(in, version, ffBuilder); |
| |
| final int attributesChanged = in.read(); |
| if (attributesChanged == 1) { |
| final int numAttributes = in.readInt(); |
| final Map<String, String> attributes = new HashMap<>(); |
| for (int i = 0; i < numAttributes; i++) { |
| final String key = readString(in); |
| final String value = readString(in); |
| attributes.put(key, value); |
| } |
| |
| ffBuilder.addAttributes(attributes); |
| } else if (attributesChanged == -1) { |
| throw new EOFException(); |
| } else if (attributesChanged != 0) { |
| throw new IOException("Attribute Change Qualifier not found in stream; found value: " |
| + attributesChanged + " after successfully restoring " + recordsRestored + " records"); |
| } |
| |
| final FlowFileRecord flowFile = ffBuilder.build(); |
| String swapLocation = null; |
| if (action == ACTION_SWAPPED_IN) { |
| swapLocation = in.readUTF(); |
| } |
| |
| final SerializedRepositoryRecord record = new ReconstitutedSerializedRepositoryRecord.Builder() |
| .queueIdentifier(connectionId) |
| .flowFileRecord(flowFile) |
| .swapLocation(swapLocation) |
| .type(getRecordType(action)) |
| .build(); |
| |
| recordsRestored++; |
| return record; |
| } |
| |
| private RepositoryRecordType getRecordType(final int serializedUpdateType) { |
| switch (serializedUpdateType) { |
| case ACTION_CREATE: |
| return RepositoryRecordType.CREATE; |
| case ACTION_SWAPPED_IN: |
| return RepositoryRecordType.SWAP_IN; |
| case ACTION_SWAPPED_OUT: |
| return RepositoryRecordType.SWAP_OUT; |
| case ACTION_UPDATE: |
| default: |
| return RepositoryRecordType.UPDATE; |
| } |
| } |
| |
| @Override |
| public void serializeRecord(final SerializedRepositoryRecord record, final DataOutputStream out) throws IOException { |
| serializeEdit(null, record, out, true); |
| } |
| |
| private void serializeContentClaim(final ContentClaim claim, final long offset, final DataOutputStream out) throws IOException { |
| if (claim == null) { |
| out.write(0); |
| } else { |
| out.write(1); |
| |
| final ResourceClaim resourceClaim = claim.getResourceClaim(); |
| writeString(resourceClaim.getId(), out); |
| writeString(resourceClaim.getContainer(), out); |
| writeString(resourceClaim.getSection(), out); |
| out.writeLong(claim.getOffset()); |
| out.writeLong(claim.getLength()); |
| |
| out.writeLong(offset); |
| out.writeBoolean(resourceClaim.isLossTolerant()); |
| } |
| } |
| |
| private void deserializeClaim(final DataInputStream in, final int serializationVersion, final StandardFlowFileRecord.Builder ffBuilder) throws IOException { |
| // determine current Content Claim. |
| final int claimExists = in.read(); |
| if (claimExists == 1) { |
| final String claimId; |
| if (serializationVersion < 4) { |
| claimId = String.valueOf(in.readLong()); |
| } else { |
| claimId = readString(in); |
| } |
| |
| final String container = readString(in); |
| final String section = readString(in); |
| |
| final long resourceOffset; |
| final long resourceLength; |
| if (serializationVersion < 7) { |
| resourceOffset = 0L; |
| resourceLength = -1L; |
| } else { |
| resourceOffset = in.readLong(); |
| resourceLength = in.readLong(); |
| } |
| |
| final long claimOffset = in.readLong(); |
| |
| final boolean lossTolerant; |
| if (serializationVersion >= 3) { |
| lossTolerant = in.readBoolean(); |
| } else { |
| lossTolerant = false; |
| } |
| |
| final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); |
| final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); |
| contentClaim.setLength(resourceLength); |
| |
| ffBuilder.contentClaim(contentClaim); |
| ffBuilder.contentClaimOffset(claimOffset); |
| } else if (claimExists == -1) { |
| throw new EOFException(); |
| } else if (claimExists != 0) { |
| throw new IOException("Claim Existence Qualifier not found in stream; found value: " |
| + claimExists + " after successfully restoring " + recordsRestored + " records"); |
| } |
| } |
| |
| private void writeString(final String toWrite, final OutputStream out) throws IOException { |
| final byte[] bytes = toWrite.getBytes(StandardCharsets.UTF_8); |
| final int utflen = bytes.length; |
| |
| if (utflen < 65535) { |
| out.write(utflen >>> 8); |
| out.write(utflen); |
| out.write(bytes); |
| } else { |
| out.write(255); |
| out.write(255); |
| out.write(utflen >>> 24); |
| out.write(utflen >>> 16); |
| out.write(utflen >>> 8); |
| out.write(utflen); |
| out.write(bytes); |
| } |
| } |
| |
| private String readString(final InputStream in) throws IOException { |
| final Integer numBytes = readFieldLength(in); |
| if (numBytes == null) { |
| throw new EOFException(); |
| } |
| final byte[] bytes = new byte[numBytes]; |
| fillBuffer(in, bytes, numBytes); |
| return new String(bytes, StandardCharsets.UTF_8); |
| } |
| |
| private Integer readFieldLength(final InputStream in) throws IOException { |
| final int firstValue = in.read(); |
| final int secondValue = in.read(); |
| if (firstValue < 0) { |
| return null; |
| } |
| if (secondValue < 0) { |
| throw new EOFException(); |
| } |
| if (firstValue == 0xff && secondValue == 0xff) { |
| final int ch1 = in.read(); |
| final int ch2 = in.read(); |
| final int ch3 = in.read(); |
| final int ch4 = in.read(); |
| if ((ch1 | ch2 | ch3 | ch4) < 0) { |
| throw new EOFException(); |
| } |
| return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; |
| } else { |
| return (firstValue << 8) + secondValue; |
| } |
| } |
| |
| private void fillBuffer(final InputStream in, final byte[] buffer, final int length) throws IOException { |
| int bytesRead; |
| int totalBytesRead = 0; |
| while ((bytesRead = in.read(buffer, totalBytesRead, length - totalBytesRead)) > 0) { |
| totalBytesRead += bytesRead; |
| } |
| if (totalBytesRead != length) { |
| throw new EOFException(); |
| } |
| } |
| |
| @Override |
| public int getVersion() { |
| return CURRENT_ENCODING_VERSION; |
| } |
| } |