blob: 858b76afc758130692c7b07080b8640de24c44a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.queue.clustered.server;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.IllegalClusterStateException;
import org.apache.nifi.controller.queue.LoadBalanceCompression;
import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryRecord;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.stream.io.ByteCountingInputStream;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLSocket;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.Checksum;
import java.util.zip.GZIPInputStream;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_PROTOCOL_NEGOTIATION;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.ABORT_TRANSACTION;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CHECK_SPACE;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.COMPLETE_TRANSACTION;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_CHECKSUM;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.CONFIRM_COMPLETE_TRANSACTION;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.DATA_FRAME_FOLLOWS;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.MORE_FLOWFILES;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_DATA_FRAME;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.NO_MORE_FLOWFILES;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.QUEUE_FULL;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REJECT_CHECKSUM;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.REQEUST_DIFFERENT_VERSION;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SKIP_SPACE_CHECK;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.SPACE_AVAILABLE;
import static org.apache.nifi.controller.queue.clustered.protocol.LoadBalanceProtocolConstants.VERSION_ACCEPTED;
public class StandardLoadBalanceProtocol implements LoadBalanceProtocol {
private static final Logger logger = LoggerFactory.getLogger(StandardLoadBalanceProtocol.class);
private static final int SOCKET_CLOSED = -1;
private static final int NO_DATA_AVAILABLE = 0;
private final FlowFileRepository flowFileRepository;
private final ContentRepository contentRepository;
private final ProvenanceRepository provenanceRepository;
private final FlowController flowController;
private final LoadBalanceAuthorizer authorizer;
private final ThreadLocal<byte[]> dataBuffer = new ThreadLocal<>();
private final AtomicLong lineageStartIndex = new AtomicLong(0L);
public StandardLoadBalanceProtocol(final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ProvenanceRepository provenanceRepository,
final FlowController flowController, final LoadBalanceAuthorizer authorizer) {
this.flowFileRepository = flowFileRepository;
this.contentRepository = contentRepository;
this.provenanceRepository = provenanceRepository;
this.flowController = flowController;
this.authorizer = authorizer;
}
@Override
public void receiveFlowFiles(final Socket socket, final InputStream in, final OutputStream out) throws IOException {
String peerDescription = socket.getInetAddress().getHostName();
if (socket instanceof SSLSocket) {
logger.debug("Connection received from peer {}", peerDescription);
peerDescription = authorizer.authorize((SSLSocket) socket);
logger.debug("Client Identities are authorized to load balance data for peer {}", peerDescription);
}
final int version = negotiateProtocolVersion(in, out, peerDescription);
if (version == SOCKET_CLOSED) {
socket.close();
return;
}
if (version == NO_DATA_AVAILABLE) {
logger.debug("No data is available from {}", socket.getRemoteSocketAddress());
return;
}
receiveFlowFiles(in, out, peerDescription, version);
}
protected int negotiateProtocolVersion(final InputStream in, final OutputStream out, final String peerDescription) throws IOException {
final VersionNegotiator negotiator = new StandardVersionNegotiator(1);
for (int i=0;; i++) {
final int requestedVersion;
try {
requestedVersion = in.read();
} catch (final SocketTimeoutException ste) {
// If first iteration, then just consider this to indicate "no data available". Otherwise, we were truly expecting data.
if (i == 0) {
logger.debug("SocketTimeoutException thrown when trying to negotiate Protocol Version");
return NO_DATA_AVAILABLE;
}
throw ste;
}
if (requestedVersion < 0) {
logger.debug("Encountered End-of-File when receiving the the recommended Protocol Version. Returning -1 for the protocol version");
return -1;
}
final boolean supported = negotiator.isVersionSupported(requestedVersion);
if (supported) {
logger.debug("Peer {} requested version {} of the Load Balance Protocol. Accepting version.", peerDescription, requestedVersion);
out.write(VERSION_ACCEPTED);
out.flush();
return requestedVersion;
}
final Integer preferredVersion = negotiator.getPreferredVersion(requestedVersion);
if (preferredVersion == null) {
logger.debug("Peer {} requested version {} of the Load Balance Protocol. This version is not acceptable. Aborting communications.", peerDescription, requestedVersion);
out.write(ABORT_PROTOCOL_NEGOTIATION);
out.flush();
throw new IOException("Peer " + peerDescription + " requested that we use version " + requestedVersion
+ " of the Load Balance Protocol, but this version is unacceptable. Aborted communications.");
}
logger.debug("Peer {} requested version {} of the Load Balance Protocol. Requesting that peer change to version {} instead.", peerDescription, requestedVersion, preferredVersion);
out.write(REQEUST_DIFFERENT_VERSION);
out.write(preferredVersion);
out.flush();
}
}
protected void receiveFlowFiles(final InputStream in, final OutputStream out, final String peerDescription, final int protocolVersion) throws IOException {
logger.debug("Receiving FlowFiles from {}", peerDescription);
final long startTimestamp = System.currentTimeMillis();
final Checksum checksum = new CRC32();
final InputStream checkedInput = new CheckedInputStream(in, checksum);
final DataInputStream dataIn = new DataInputStream(checkedInput);
final String connectionId = getConnectionID(dataIn, peerDescription);
if (connectionId == null) {
logger.debug("Received no Connection ID from Peer {}. Will consider receipt of FlowFiles complete", peerDescription);
return;
}
final Connection connection = flowController.getFlowManager().getConnection(connectionId);
if (connection == null) {
logger.error("Attempted to receive FlowFiles from Peer {} for Connection with ID {} but no connection exists with that ID", peerDescription, connectionId);
throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but no Connection exists with that ID");
}
final FlowFileQueue flowFileQueue = connection.getFlowFileQueue();
if (!(flowFileQueue instanceof LoadBalancedFlowFileQueue)) {
throw new TransactionAbortedException("Attempted to receive FlowFiles from Peer " + peerDescription + " for Connection with ID " + connectionId + " but the Connection with that ID is " +
"not configured to allow for Load Balancing");
}
final LoadBalancedFlowFileQueue loadBalancedFlowFileQueue = (LoadBalancedFlowFileQueue) flowFileQueue;
final int spaceCheck = dataIn.read();
if (spaceCheck < 0) {
throw new EOFException("Expected to receive a request to determine whether or not space was available for Connection with ID " + connectionId + " from Peer " + peerDescription);
}
if (spaceCheck == CHECK_SPACE) {
if (loadBalancedFlowFileQueue.isLocalPartitionFull()) {
logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with QUEUE_FULL", peerDescription, connectionId);
out.write(QUEUE_FULL);
out.flush();
return; // we're finished receiving flowfiles for now, and we'll restart the communication process.
} else {
logger.debug("Received a 'Check Space' request from Peer {} for Connection with ID {}; responding with SPACE_AVAILABLE", peerDescription, connectionId);
out.write(SPACE_AVAILABLE);
out.flush();
}
} else if (spaceCheck != SKIP_SPACE_CHECK) {
throw new TransactionAbortedException("Expected to receive a request to determine whether or not space was available for Connection with ID "
+ connectionId + " from Peer " + peerDescription + " but instead received value " + spaceCheck);
}
final LoadBalanceCompression compression = connection.getFlowFileQueue().getLoadBalanceCompression();
logger.debug("Receiving FlowFiles from Peer {} for Connection {}; Compression = {}", peerDescription, connectionId, compression);
ContentClaim contentClaim = null;
final List<RemoteFlowFileRecord> flowFilesReceived = new ArrayList<>();
OutputStream contentClaimOut = null;
long claimOffset = 0L;
try {
try {
while (isMoreFlowFiles(dataIn, protocolVersion)) {
if (contentClaim == null) {
contentClaim = contentRepository.create(false);
contentClaimOut = contentRepository.write(contentClaim);
}
final RemoteFlowFileRecord flowFile = receiveFlowFile(dataIn, contentClaimOut, contentClaim, claimOffset, protocolVersion, peerDescription, compression);
// The FlowFile's Content Claim will either be null or equal to the provided Content Claim.
// Incrementing the FlowFile's content claim will increment the count for the provided Content Claim, if it was
// assigned to the FlowFIle, or call incrementClaimantCount with an argument of null, which will do nothing.
contentRepository.incrementClaimaintCount(flowFile.getFlowFile().getContentClaim());
flowFilesReceived.add(flowFile);
claimOffset += flowFile.getFlowFile().getSize();
}
} finally {
if (contentClaimOut != null) {
contentClaimOut.close();
}
}
// When the Content Claim is created initially, it has a Claimaint Count of 1. We then increment the Claimant Count for each FlowFile that we add to the Content Claim,
// which means that the claimant count is currently 1 larger than it needs to be. So we will decrement the claimant count now. If that results in a count of 0, then
// we can go ahead and remove the Content Claim, since we know it's not being referenced.
final int count = contentRepository.decrementClaimantCount(contentClaim);
verifyChecksum(checksum, in, out, peerDescription, flowFilesReceived.size());
completeTransaction(in, out, peerDescription, flowFilesReceived, connectionId, startTimestamp, (LoadBalancedFlowFileQueue) flowFileQueue);
if (count == 0) {
contentRepository.remove(contentClaim);
}
} catch (final Exception e) {
// If any Exception occurs, we need to decrement the claimant counts for the Content Claims that we wrote to because
// they are no longer needed.
for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
contentRepository.decrementClaimantCount(remoteFlowFile.getFlowFile().getContentClaim());
}
contentRepository.remove(contentClaim);
throw e;
}
logger.debug("Successfully received {} FlowFiles from Peer {} to Load Balance for Connection {}", flowFilesReceived.size(), peerDescription, connectionId);
}
private void completeTransaction(final InputStream in, final OutputStream out, final String peerDescription, final List<RemoteFlowFileRecord> flowFilesReceived,
final String connectionId, final long startTimestamp, final LoadBalancedFlowFileQueue flowFileQueue) throws IOException {
final int completionIndicator = in.read();
if (completionIndicator < 0) {
throw new EOFException("Expected to receive a Transaction Completion Indicator from Peer " + peerDescription + " but encountered EOF");
}
if (completionIndicator == ABORT_TRANSACTION) {
throw new TransactionAbortedException("Peer " + peerDescription + " chose to Abort Load Balance Transaction");
}
if (completionIndicator != COMPLETE_TRANSACTION) {
logger.debug("Expected to receive Transaction Completion Indicator from Peer " + peerDescription + " but instead received a value of " + completionIndicator + ". Sending back an Abort " +
"Transaction Flag.");
out.write(ABORT_TRANSACTION);
out.flush();
throw new IOException("Expected to receive Transaction Completion Indicator from Peer " + peerDescription + " but instead received a value of " + completionIndicator);
}
logger.debug("Received Complete Transaction indicator from Peer {}", peerDescription);
registerReceiveProvenanceEvents(flowFilesReceived, peerDescription, connectionId, startTimestamp);
updateFlowFileRepository(flowFilesReceived, flowFileQueue);
try {
transferFlowFilesToQueue(flowFilesReceived, flowFileQueue);
} catch (final IllegalClusterStateException e) {
logger.error("Failed to transferred received data into FlowFile Queue {}", flowFileQueue, e);
out.write(ABORT_TRANSACTION);
out.flush();
try {
cleanupRepositoriesOnTransferFailure(flowFilesReceived, flowFileQueue, "Rejected transfer due to " + e.getMessage());
} catch (final Exception e1) {
logger.error("Failed to update FlowFile/Provenance Repositories to denote that the data that could not be received should no longer be present on this node", e1);
}
// We log the error here and cleanup. We do not throw an Exception. If we did throw an Exception,
// the caller of this method would catch the Exception and decrement the Content Claims, etc. However,
// since we have already updated the FlowFile Repository to DROP the data, that would decrement the claims
// twice, which could lead to data loss.
return;
}
out.write(CONFIRM_COMPLETE_TRANSACTION);
out.flush();
}
private void cleanupRepositoriesOnTransferFailure(final List<RemoteFlowFileRecord> flowFilesReceived, final FlowFileQueue flowFileQueue, final String details) throws IOException {
dropFlowFilesFromRepository(flowFilesReceived, flowFileQueue);
reportDropEvents(flowFilesReceived, flowFileQueue.getIdentifier(), details);
}
private void dropFlowFilesFromRepository(final List<RemoteFlowFileRecord> flowFiles, final FlowFileQueue flowFileQueue) throws IOException {
final List<RepositoryRecord> repoRecords = flowFiles.stream()
.map(remoteFlowFile -> {
final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
record.setDestination(flowFileQueue);
record.markForDelete();
return record;
})
.collect(Collectors.toList());
flowFileRepository.updateRepository(repoRecords);
logger.debug("Updated FlowFile Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile Queue",
repoRecords);
}
private void reportDropEvents(final List<RemoteFlowFileRecord> flowFilesReceived, final String connectionId, final String details) {
final List<ProvenanceEventRecord> events = new ArrayList<>(flowFilesReceived.size());
for (final RemoteFlowFileRecord remoteFlowFile : flowFilesReceived) {
final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder()
.fromFlowFile(flowFileRecord)
.setEventType(ProvenanceEventType.DROP)
.setComponentId(connectionId)
.setComponentType("Load Balanced Connection")
.setDetails(details);
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize());
}
final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build();
events.add(provenanceEvent);
}
logger.debug("Updated Provenance Repository to note that {} FlowFiles were dropped from the system because the data received from the other node could not be transferred to the FlowFile " +
"Queue", events.size());
provenanceRepository.registerEvents(events);
}
private void registerReceiveProvenanceEvents(final List<RemoteFlowFileRecord> flowFiles, final String nodeName, final String connectionId, final long startTimestamp) {
final long duration = System.currentTimeMillis() - startTimestamp;
final List<ProvenanceEventRecord> events = new ArrayList<>(flowFiles.size());
for (final RemoteFlowFileRecord remoteFlowFile : flowFiles) {
final FlowFileRecord flowFileRecord = remoteFlowFile.getFlowFile();
final ProvenanceEventBuilder provenanceEventBuilder = new StandardProvenanceEventRecord.Builder()
.fromFlowFile(flowFileRecord)
.setEventType(ProvenanceEventType.RECEIVE)
.setTransitUri("nifi://" + nodeName + "/loadbalance/" + connectionId)
.setSourceSystemFlowFileIdentifier(remoteFlowFile.getRemoteUuid())
.setEventDuration(duration)
.setComponentId(connectionId)
.setComponentType("Load Balanced Connection");
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
if (contentClaim != null) {
final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
provenanceEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
contentClaim.getOffset() + flowFileRecord.getContentClaimOffset(), flowFileRecord.getSize());
}
final ProvenanceEventRecord provenanceEvent = provenanceEventBuilder.build();
events.add(provenanceEvent);
}
provenanceRepository.registerEvents(events);
}
private void updateFlowFileRepository(final List<RemoteFlowFileRecord> flowFiles, final FlowFileQueue flowFileQueue) throws IOException {
final List<RepositoryRecord> repoRecords = flowFiles.stream()
.map(remoteFlowFile -> {
final StandardRepositoryRecord record = new StandardRepositoryRecord(flowFileQueue, remoteFlowFile.getFlowFile());
record.setDestination(flowFileQueue);
return record;
})
.collect(Collectors.toList());
flowFileRepository.updateRepository(repoRecords);
}
private void transferFlowFilesToQueue(final List<RemoteFlowFileRecord> remoteFlowFiles, final LoadBalancedFlowFileQueue flowFileQueue) throws IllegalClusterStateException {
final List<FlowFileRecord> flowFiles = remoteFlowFiles.stream().map(RemoteFlowFileRecord::getFlowFile).collect(Collectors.toList());
flowFileQueue.receiveFromPeer(flowFiles);
}
private void verifyChecksum(final Checksum checksum, final InputStream in, final OutputStream out, final String peerDescription, final int flowFileCount) throws IOException {
final long expectedChecksum = readChecksum(in);
if (checksum.getValue() == expectedChecksum) {
logger.debug("Checksum from Peer {} matched the checksum that was calculated. Writing confirmation.", peerDescription);
out.write(CONFIRM_CHECKSUM);
out.flush();
} else {
logger.error("Received {} FlowFiles from peer {} but the Checksum reported by the peer ({}) did not match the checksum that was calculated ({}). Will reject the transaction.",
flowFileCount, peerDescription, expectedChecksum, checksum.getValue());
out.write(REJECT_CHECKSUM);
out.flush();
throw new TransactionAbortedException("Transaction with Peer " + peerDescription + " was aborted because the calculated checksum did not match the checksum provided by peer.");
}
}
private long readChecksum(final InputStream in) throws IOException {
final byte[] buffer = getDataBuffer();
StreamUtils.read(in, buffer,8 );
return ByteBuffer.wrap(buffer, 0, 8).getLong();
}
private byte[] getDataBuffer() {
byte[] buffer = dataBuffer.get();
if (buffer == null) {
buffer = new byte[65536 + 4096];
dataBuffer.set(buffer);
}
return buffer;
}
private String getConnectionID(final DataInputStream in, final String peerDescription) throws IOException {
try {
return in.readUTF();
} catch (final EOFException eof) {
logger.debug("Encountered EOFException when trying to receive Connection ID from Peer {}. Returning null for Connection ID", peerDescription);
return null;
}
}
private boolean isMoreFlowFiles(final DataInputStream in, final int protocolVersion) throws IOException {
final int indicator = in.read();
if (indicator < 0) {
throw new EOFException();
}
if (indicator == MORE_FLOWFILES) {
logger.debug("Peer indicates that there is another FlowFile in transaction");
return true;
}
if (indicator == NO_MORE_FLOWFILES) {
logger.debug("Peer indicates that there are no more FlowFiles in transaction");
return false;
}
throw new IOException("Expected to receive 'More FlowFiles' indicator (" + MORE_FLOWFILES
+ ") or 'No More FlowFiles' indicator (" + NO_MORE_FLOWFILES + ") but received invalid value of " + indicator);
}
private RemoteFlowFileRecord receiveFlowFile(final DataInputStream dis, final OutputStream out, final ContentClaim contentClaim, final long claimOffset, final int protocolVersion,
final String peerDescription, final LoadBalanceCompression compression) throws IOException {
final int metadataLength = dis.readInt();
DataInputStream metadataIn = new DataInputStream(new LimitingInputStream(dis, metadataLength));
if (compression != LoadBalanceCompression.DO_NOT_COMPRESS) {
metadataIn = new DataInputStream(new GZIPInputStream(metadataIn));
}
final Map<String, String> attributes = readAttributes(metadataIn);
final String sourceSystemUuid = attributes.get(CoreAttributes.UUID.key());
logger.debug("Received Attributes {} from Peer {}", attributes, peerDescription);
final long lineageStartDate = metadataIn.readLong();
final long entryDate = metadataIn.readLong();
final long penaltyExpirationMillis = metadataIn.readLong();
final ContentClaimTriple contentClaimTriple = consumeContent(dis, out, contentClaim, claimOffset, peerDescription, compression == LoadBalanceCompression.COMPRESS_ATTRIBUTES_AND_CONTENT);
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
.id(flowFileRepository.getNextFlowFileSequence())
.addAttributes(attributes)
.addAttribute(CoreAttributes.UUID.key(), UUID.randomUUID().toString())
.contentClaim(contentClaimTriple.getContentClaim())
.contentClaimOffset(contentClaimTriple.getClaimOffset())
.size(contentClaimTriple.getContentLength())
.entryDate(entryDate)
.lineageStart(lineageStartDate, lineageStartIndex.getAndIncrement())
.penaltyExpirationTime(penaltyExpirationMillis)
.build();
logger.debug("Received FlowFile {} with {} attributes and {} bytes of content", flowFileRecord, attributes.size(), contentClaimTriple.getContentLength());
return new RemoteFlowFileRecord(sourceSystemUuid, flowFileRecord);
}
private Map<String, String> readAttributes(final DataInputStream in) throws IOException {
final int attributeCount = in.readInt();
final Map<String, String> attributes = new HashMap<>();
for (int i = 0; i < attributeCount; i++) {
final String key = readLongString(in);
final String value = readLongString(in);
logger.trace("Received attribute '{}' = '{}'", key, value);
attributes.put(key, value);
}
return attributes;
}
private String readLongString(final DataInputStream in) throws IOException {
final int stringLength = in.readInt();
final byte[] bytes = new byte[stringLength];
StreamUtils.fillBuffer(in, bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
private ContentClaimTriple consumeContent(final DataInputStream in, final OutputStream out, final ContentClaim contentClaim, final long claimOffset,
final String peerDescription, final boolean compressed) throws IOException {
logger.debug("Consuming content from Peer {}", peerDescription);
int dataFrameIndicator = in.read();
if (dataFrameIndicator < 0) {
throw new EOFException("Encountered End-of-File when expecting to read Data Frame Indicator from Peer " + peerDescription);
}
if (dataFrameIndicator == NO_DATA_FRAME) {
logger.debug("Peer {} indicates that there is no Data Frame for the FlowFile", peerDescription);
return new ContentClaimTriple(null, 0L, 0L);
}
if (dataFrameIndicator == ABORT_TRANSACTION) {
throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted");
}
if (dataFrameIndicator != DATA_FRAME_FOLLOWS) {
throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
}
int dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription);
byte[] buffer = getDataBuffer();
long claimLength = 0;
while (true) {
final InputStream limitedIn = new LimitedInputStream(in, dataFrameLength);
final ByteCountingInputStream bcis = new ByteCountingInputStream(limitedIn);
final InputStream contentIn = compressed ? new GZIPInputStream(bcis) : bcis;
final int decompressedSize = StreamUtils.fillBuffer(contentIn, buffer, false);
if (bcis.getBytesRead() < dataFrameLength) {
throw new EOFException("Expected to receive a Data Frame of length " + dataFrameLength + " bytes but received only " + bcis.getBytesRead() + " bytes");
}
out.write(buffer, 0, decompressedSize);
claimLength += decompressedSize;
dataFrameIndicator = in.read();
if (dataFrameIndicator < 0) {
throw new EOFException("Encountered End-of-File when expecting to receive a Data Frame Indicator");
}
if (dataFrameIndicator == NO_DATA_FRAME) {
logger.debug("Peer {} indicated that no more data frames are available", peerDescription);
break;
}
if (dataFrameIndicator == ABORT_TRANSACTION) {
logger.debug("Peer {} requested that transaction be aborted by sending Data Frame Length of {}", peerDescription, dataFrameLength);
throw new TransactionAbortedException("Peer " + peerDescription + " requested that transaction be aborted");
}
if (dataFrameIndicator != DATA_FRAME_FOLLOWS) {
throw new IOException("Expected a Data Frame Indicator from Peer " + peerDescription + " but received a value of " + dataFrameIndicator);
}
dataFrameLength = in.readInt();
logger.trace("Received Data Frame Length of {} for {}", dataFrameLength, peerDescription);
}
return new ContentClaimTriple(contentClaim, claimOffset, claimLength);
}
private static class ContentClaimTriple {
private final ContentClaim contentClaim;
private final long claimOffset;
private final long contentLength;
public ContentClaimTriple(ContentClaim contentClaim, long claimOffset, long contentLength) {
this.contentClaim = contentClaim;
this.claimOffset = claimOffset;
this.contentLength = contentLength;
}
public ContentClaim getContentClaim() {
return contentClaim;
}
public long getClaimOffset() {
return claimOffset;
}
public long getContentLength() {
return contentLength;
}
}
private static class RemoteFlowFileRecord {
private final String remoteUuid;
private final FlowFileRecord flowFile;
public RemoteFlowFileRecord(final String remoteUuid, final FlowFileRecord flowFile) {
this.remoteUuid = remoteUuid;
this.flowFile = flowFile;
}
public String getRemoteUuid() {
return remoteUuid;
}
public FlowFileRecord getFlowFile() {
return flowFile;
}
}
}