| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.controller.queue; |
| |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.controller.repository.FlowFileSwapManager; |
| import org.apache.nifi.controller.repository.IncompleteSwapFileException; |
| import org.apache.nifi.controller.repository.SwapContents; |
| import org.apache.nifi.controller.repository.SwapSummary; |
| import org.apache.nifi.controller.repository.claim.ResourceClaim; |
| import org.apache.nifi.controller.swap.StandardSwapSummary; |
| import org.apache.nifi.events.EventReporter; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.FlowFilePrioritizer; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.processor.FlowFileFilter; |
| import org.apache.nifi.processor.FlowFileFilter.FlowFileFilterResult; |
| import org.apache.nifi.reporting.Severity; |
| import org.apache.nifi.util.concurrency.TimedLock; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.PriorityQueue; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| |
| public class SwappablePriorityQueue { |
| private static final Logger logger = LoggerFactory.getLogger(SwappablePriorityQueue.class); |
| private static final int SWAP_RECORD_POLL_SIZE = 10_000; |
| private static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 10_000; |
| |
| private final int swapThreshold; |
| private final FlowFileSwapManager swapManager; |
| private final EventReporter eventReporter; |
| private final FlowFileQueue flowFileQueue; |
| private final DropFlowFileAction dropAction; |
| private final List<FlowFilePrioritizer> priorities = new ArrayList<>(); |
| private final String swapPartitionName; |
| |
| private final List<String> swapLocations = new ArrayList<>(); |
| private final AtomicReference<FlowFileQueueSize> size = new AtomicReference<>(new FlowFileQueueSize(0, 0L, 0, 0L, 0, 0, 0L)); |
| private final TimedLock readLock; |
| private final TimedLock writeLock; |
| |
| // We keep an "active queue" and a "swap queue" that both are able to hold records in heap. When |
| // FlowFiles are added to this FlowFileQueue, we first check if we are in "swap mode" and if so |
| // we add to the 'swap queue' instead of the 'active queue'. The code would be much simpler if we |
| // eliminated the 'swap queue' and instead just used the active queue and swapped out the 10,000 |
| // lowest priority FlowFiles from that. However, doing that would cause problems with the ordering |
| // of FlowFiles. If we swap out some FlowFiles, and then allow a new FlowFile to be written to the |
| // active queue, then we would end up processing the newer FlowFile before the swapped FlowFile. By |
| // keeping these separate, we are able to guarantee that FlowFiles are swapped in in the same order |
| // that they are swapped out. |
| // Guarded by lock. |
| private PriorityQueue<FlowFileRecord> activeQueue; |
| private ArrayList<FlowFileRecord> swapQueue; |
| private boolean swapMode = false; |
| |
| // The following members are used to keep metrics in memory for reporting purposes so that we don't have to constantly |
| // read these values from swap files on disk. |
| private final Map<String, Long> minQueueDateInSwapLocation = new HashMap<>(); |
| private final Map<String, Long> totalQueueDateInSwapLocation = new HashMap<>(); |
| |
| public SwappablePriorityQueue(final FlowFileSwapManager swapManager, final int swapThreshold, final EventReporter eventReporter, final FlowFileQueue flowFileQueue, |
| final DropFlowFileAction dropAction, final String swapPartitionName) { |
| this.swapManager = swapManager; |
| this.swapThreshold = swapThreshold; |
| |
| this.activeQueue = new PriorityQueue<>(20, new QueuePrioritizer(Collections.emptyList())); |
| this.swapQueue = new ArrayList<>(); |
| this.eventReporter = eventReporter; |
| this.flowFileQueue = flowFileQueue; |
| this.dropAction = dropAction; |
| this.swapPartitionName = swapPartitionName; |
| |
| final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); |
| readLock = new TimedLock(lock.readLock(), flowFileQueue.getIdentifier() + " Read Lock", 100); |
| writeLock = new TimedLock(lock.writeLock(), flowFileQueue.getIdentifier() + " Write Lock", 100); |
| } |
| |
| private String getQueueIdentifier() { |
| return flowFileQueue.getIdentifier(); |
| } |
| |
| public List<FlowFilePrioritizer> getPriorities() { |
| readLock.lock(); |
| try { |
| return Collections.unmodifiableList(priorities); |
| } finally { |
| readLock.unlock("getPriorities"); |
| } |
| } |
| |
| public void setPriorities(final List<FlowFilePrioritizer> newPriorities) { |
| writeLock.lock(); |
| try { |
| priorities.clear(); |
| priorities.addAll(newPriorities); |
| |
| final PriorityQueue<FlowFileRecord> newQueue = new PriorityQueue<>(Math.max(20, activeQueue.size()), new QueuePrioritizer(newPriorities)); |
| newQueue.addAll(activeQueue); |
| activeQueue = newQueue; |
| } finally { |
| writeLock.unlock("setPriorities"); |
| } |
| } |
| |
| |
| public LocalQueuePartitionDiagnostics getQueueDiagnostics() { |
| readLock.lock(); |
| try { |
| final boolean anyPenalized = !activeQueue.isEmpty() && activeQueue.peek().isPenalized(); |
| final boolean allPenalized = anyPenalized && activeQueue.stream().anyMatch(FlowFileRecord::isPenalized); |
| |
| return new StandardLocalQueuePartitionDiagnostics(getFlowFileQueueSize(), anyPenalized, allPenalized); |
| } finally { |
| readLock.unlock("getQueueDiagnostics"); |
| } |
| } |
| |
| public List<FlowFileRecord> getActiveFlowFiles() { |
| readLock.lock(); |
| try { |
| return new ArrayList<>(activeQueue); |
| } finally { |
| readLock.unlock("getActiveFlowFiles"); |
| } |
| } |
| |
| public boolean isUnacknowledgedFlowFile() { |
| return getFlowFileQueueSize().getUnacknowledgedCount() > 0; |
| } |
| |
| /** |
| * This method MUST be called with the write lock held |
| */ |
| private void writeSwapFilesIfNecessary() { |
| if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { |
| return; |
| } |
| |
| migrateSwapToActive(); |
| if (swapQueue.size() < SWAP_RECORD_POLL_SIZE) { |
| return; |
| } |
| |
| final int numSwapFiles = swapQueue.size() / SWAP_RECORD_POLL_SIZE; |
| |
| int originalSwapQueueCount = swapQueue.size(); |
| long originalSwapQueueBytes = 0L; |
| for (final FlowFileRecord flowFile : swapQueue) { |
| originalSwapQueueBytes += flowFile.getSize(); |
| } |
| |
| // Create a new Priority queue with the same prioritizers that are set for this queue. We want to swap out the highest priority data first, because |
| // whatever data we don't write out to a swap file (because there isn't enough to fill a swap file) will be added back to the swap queue. |
| // Since the swap queue cannot be processed until all swap files, we want to ensure that only the lowest priority data goes back onto it. Which means |
| // that we must swap out the highest priority data that is currently on the swap queue. |
| final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); |
| tempQueue.addAll(swapQueue); |
| |
| long bytesSwappedOut = 0L; |
| int flowFilesSwappedOut = 0; |
| final List<String> swapLocations = new ArrayList<>(numSwapFiles); |
| for (int i = 0; i < numSwapFiles; i++) { |
| long bytesSwappedThisIteration = 0L; |
| long totalSwapQueueDatesThisIteration = 0L; |
| long minQueueDateThisIteration = Long.MAX_VALUE; |
| |
| // Create a new swap file for the next SWAP_RECORD_POLL_SIZE records |
| final List<FlowFileRecord> toSwap = new ArrayList<>(SWAP_RECORD_POLL_SIZE); |
| for (int j = 0; j < SWAP_RECORD_POLL_SIZE; j++) { |
| final FlowFileRecord flowFile = tempQueue.poll(); |
| toSwap.add(flowFile); |
| bytesSwappedThisIteration += flowFile.getSize(); |
| totalSwapQueueDatesThisIteration += flowFile.getLastQueueDate(); |
| minQueueDateThisIteration = minQueueDateThisIteration < flowFile.getLastQueueDate() ? minQueueDateThisIteration : flowFile.getLastQueueDate(); |
| } |
| |
| try { |
| Collections.reverse(toSwap); // currently ordered in reverse priority order based on the ordering of the temp queue. |
| final String swapLocation = swapManager.swapOut(toSwap, flowFileQueue, swapPartitionName); |
| swapLocations.add(swapLocation); |
| |
| logger.debug("Successfully wrote out Swap File {} containing {} FlowFiles ({} bytes)", swapLocation, toSwap.size(), bytesSwappedThisIteration); |
| |
| bytesSwappedOut += bytesSwappedThisIteration; |
| flowFilesSwappedOut += toSwap.size(); |
| minQueueDateInSwapLocation.put(swapLocation, minQueueDateThisIteration); |
| totalQueueDateInSwapLocation.put(swapLocation, totalSwapQueueDatesThisIteration); |
| } catch (final IOException ioe) { |
| tempQueue.addAll(toSwap); // if we failed, we must add the FlowFiles back to the queue. |
| |
| final int objectCount = getFlowFileCount(); |
| logger.error("FlowFile Queue with identifier {} has {} FlowFiles queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting " |
| + "the Java heap space but failed to write information to disk due to {}", getQueueIdentifier(), objectCount, ioe.toString()); |
| logger.error("", ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "Failed to Overflow to Disk", "Flowfile Queue with identifier " + getQueueIdentifier() + " has " + objectCount + |
| " queued up. Attempted to spill FlowFile information over to disk in order to avoid exhausting the Java heap space but failed to write information to disk. " |
| + "See logs for more information."); |
| } |
| |
| break; |
| } |
| } |
| |
| // Pull any records off of the temp queue that won't fit back on the active queue, and add those to the |
| // swap queue. Then add the records back to the active queue. |
| swapQueue.clear(); |
| long updatedSwapQueueBytes = 0L; |
| FlowFileRecord record; |
| while ((record = tempQueue.poll()) != null) { |
| swapQueue.add(record); |
| updatedSwapQueueBytes += record.getSize(); |
| } |
| |
| Collections.reverse(swapQueue); // currently ordered in reverse priority order based on the ordering of the temp queue |
| |
| boolean updated = false; |
| while (!updated) { |
| final FlowFileQueueSize originalSize = getFlowFileQueueSize(); |
| |
| final int addedSwapRecords = swapQueue.size() - originalSwapQueueCount; |
| final long addedSwapBytes = updatedSwapQueueBytes - originalSwapQueueBytes; |
| |
| final FlowFileQueueSize newSize = new FlowFileQueueSize(originalSize.getActiveCount(), originalSize.getActiveBytes(), |
| originalSize.getSwappedCount() + addedSwapRecords + flowFilesSwappedOut, |
| originalSize.getSwappedBytes() + addedSwapBytes + bytesSwappedOut, |
| originalSize.getSwapFileCount() + numSwapFiles, |
| originalSize.getUnacknowledgedCount(), originalSize.getUnacknowledgedBytes()); |
| |
| updated = updateSize(originalSize, newSize); |
| if (updated) { |
| logIfNegative(originalSize, newSize, "swap"); |
| } |
| } |
| |
| this.swapLocations.addAll(swapLocations); |
| logger.debug("After writing swap files, setting new set of Swap Locations to {}", this.swapLocations); |
| } |
| |
| private int getFlowFileCount() { |
| final FlowFileQueueSize size = getFlowFileQueueSize(); |
| return size.getActiveCount() + size.getSwappedCount() + size.getUnacknowledgedCount(); |
| } |
| |
| /** |
| * If there are FlowFiles waiting on the swap queue, move them to the active |
| * queue until we meet our threshold. This prevents us from having to swap |
| * them to disk & then back out. |
| * |
| * This method MUST be called with the writeLock held. |
| */ |
| private void migrateSwapToActive() { |
| // Migrate as many FlowFiles as we can from the Swap Queue to the Active Queue, so that we don't |
| // have to swap them out & then swap them back in. |
| // If we don't do this, we could get into a situation where we have potentially thousands of FlowFiles |
| // sitting on the Swap Queue but not getting processed because there aren't enough to be swapped out. |
| // In particular, this can happen if the queue is typically filled with surges. |
| // For example, if the queue has 25,000 FlowFiles come in, it may process 20,000 of them and leave |
| // 5,000 sitting on the Swap Queue. If it then takes an hour for an additional 5,000 FlowFiles to come in, |
| // those FlowFiles sitting on the Swap Queue will sit there for an hour, waiting to be swapped out and |
| // swapped back in again. |
| // Calling this method when records are polled prevents this condition by migrating FlowFiles from the |
| // Swap Queue to the Active Queue. However, we don't do this if there are FlowFiles already swapped out |
| // to disk, because we want them to be swapped back in in the same order that they were swapped out. |
| if (!activeQueue.isEmpty()) { |
| return; |
| } |
| |
| // If there are swap files waiting to be swapped in, swap those in first. We do this in order to ensure that those that |
| // were swapped out first are then swapped back in first. If we instead just immediately migrated the FlowFiles from the |
| // swap queue to the active queue, and we never run out of FlowFiles in the active queue (because destination cannot |
| // keep up with queue), we will end up always processing the new FlowFiles first instead of the FlowFiles that arrived |
| // first. |
| if (!swapLocations.isEmpty()) { |
| swapIn(); |
| return; |
| } |
| |
| // this is the most common condition (nothing is swapped out), so do the check first and avoid the expense |
| // of other checks for 99.999% of the cases. |
| final FlowFileQueueSize size = getFlowFileQueueSize(); |
| if (size.getSwappedCount() == 0 && swapQueue.isEmpty()) { |
| return; |
| } |
| |
| if (size.getSwappedCount() > swapQueue.size()) { |
| // we already have FlowFiles swapped out, so we won't migrate the queue; we will wait for |
| // the files to be swapped back in first |
| return; |
| } |
| |
| // Swap Queue is not currently ordered. We want to migrate the highest priority FlowFiles to the Active Queue, then re-queue the lowest priority items. |
| final PriorityQueue<FlowFileRecord> tempQueue = new PriorityQueue<>(swapQueue.size(), new QueuePrioritizer(getPriorities())); |
| tempQueue.addAll(swapQueue); |
| |
| int recordsMigrated = 0; |
| long bytesMigrated = 0L; |
| while (activeQueue.size() < swapThreshold) { |
| final FlowFileRecord toMigrate = tempQueue.poll(); |
| if (toMigrate == null) { |
| break; |
| } |
| |
| activeQueue.add(toMigrate); |
| bytesMigrated += toMigrate.getSize(); |
| recordsMigrated++; |
| } |
| |
| swapQueue.clear(); |
| FlowFileRecord toRequeue; |
| while ((toRequeue = tempQueue.poll()) != null) { |
| swapQueue.add(toRequeue); |
| } |
| |
| if (recordsMigrated > 0) { |
| incrementActiveQueueSize(recordsMigrated, bytesMigrated); |
| incrementSwapQueueSize(-recordsMigrated, -bytesMigrated, 0); |
| logger.debug("Migrated {} FlowFiles from swap queue to active queue for {}", recordsMigrated, this); |
| } |
| |
| if (size.getSwappedCount() == 0) { |
| swapMode = false; |
| } |
| } |
| |
| private void swapIn() { |
| final String swapLocation = swapLocations.get(0); |
| boolean partialContents = false; |
| SwapContents swapContents; |
| try { |
| logger.debug("Attempting to swap in {}; all swap locations = {}", swapLocation, swapLocations); |
| swapContents = swapManager.swapIn(swapLocation, flowFileQueue); |
| swapLocations.remove(0); |
| minQueueDateInSwapLocation.remove(swapLocation); |
| totalQueueDateInSwapLocation.remove(swapLocation); |
| } catch (final IncompleteSwapFileException isfe) { |
| logger.error("Failed to swap in all FlowFiles from Swap File {}; Swap File ended prematurely. The records that were present will still be swapped in", swapLocation); |
| logger.error("", isfe); |
| swapContents = isfe.getPartialContents(); |
| partialContents = true; |
| swapLocations.remove(0); |
| minQueueDateInSwapLocation.remove(swapLocation); |
| totalQueueDateInSwapLocation.remove(swapLocation); |
| } catch (final FileNotFoundException fnfe) { |
| logger.error("Failed to swap in FlowFiles from Swap File {} because the Swap File can no longer be found", swapLocation); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the Swap File can no longer be found"); |
| } |
| |
| swapLocations.remove(0); |
| minQueueDateInSwapLocation.remove(swapLocation); |
| totalQueueDateInSwapLocation.remove(swapLocation); |
| return; |
| } catch (final IOException ioe) { |
| logger.error("Failed to swap in FlowFiles from Swap File {}; Swap File appears to be corrupt!", swapLocation); |
| logger.error("", ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "Swap File", "Failed to swap in FlowFiles from Swap File " + |
| swapLocation + "; Swap File appears to be corrupt! Some FlowFiles in the queue may not be accessible. See logs for more information."); |
| } |
| |
| // We do not remove the Swap File from swapLocations because the IOException may be recoverable later. For instance, the file may be on a network |
| // drive and we may have connectivity problems, etc. |
| return; |
| } catch (final Throwable t) { |
| logger.error("Failed to swap in FlowFiles from Swap File {}", swapLocation, t); |
| |
| // We do not remove the Swap File from swapLocations because this is an unexpected failure that may be retry-able. For example, if there were |
| // an OOME, etc. then we don't want to he queue to still reflect that the data is around but never swap it in. By leaving the Swap File |
| // in swapLocations, we will continue to retry. |
| throw t; |
| } |
| |
| final QueueSize swapSize = swapContents.getSummary().getQueueSize(); |
| final long contentSize = swapSize.getByteCount(); |
| final int flowFileCount = swapSize.getObjectCount(); |
| incrementSwapQueueSize(-flowFileCount, -contentSize, -1); |
| |
| if (partialContents) { |
| // if we have partial results, we need to calculate the content size of the flowfiles |
| // actually swapped back in. |
| long contentSizeSwappedIn = 0L; |
| for (final FlowFileRecord swappedIn : swapContents.getFlowFiles()) { |
| contentSizeSwappedIn += swappedIn.getSize(); |
| } |
| |
| incrementActiveQueueSize(swapContents.getFlowFiles().size(), contentSizeSwappedIn); |
| logger.debug("Swapped in partial contents containing {} FlowFiles ({} bytes) from {}", swapContents.getFlowFiles().size(), contentSizeSwappedIn, swapLocation); |
| } else { |
| // we swapped in the whole swap file. We can just use the info that we got from the summary. |
| incrementActiveQueueSize(flowFileCount, contentSize); |
| logger.debug("Successfully swapped in Swap File {} containing {} FlowFiles ({} bytes)", swapLocation, flowFileCount, contentSize); |
| } |
| |
| activeQueue.addAll(swapContents.getFlowFiles()); |
| } |
| |
| public QueueSize size() { |
| return getFlowFileQueueSize().toQueueSize(); |
| } |
| |
| public boolean isEmpty() { |
| return getFlowFileQueueSize().isEmpty(); |
| } |
| |
| public boolean isActiveQueueEmpty() { |
| final FlowFileQueueSize queueSize = getFlowFileQueueSize(); |
| return queueSize.getActiveCount() == 0 && queueSize.getSwappedCount() == 0; |
| } |
| |
| public void acknowledge(final FlowFileRecord flowFile) { |
| logger.trace("{} Acknowledging {}", this, flowFile); |
| incrementUnacknowledgedQueueSize(-1, -flowFile.getSize()); |
| } |
| |
| public void acknowledge(final Collection<FlowFileRecord> flowFiles) { |
| logger.trace("{} Acknowledging {}", this, flowFiles); |
| final long totalSize = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum(); |
| incrementUnacknowledgedQueueSize(-flowFiles.size(), -totalSize); |
| } |
| |
| |
| public void put(final FlowFileRecord flowFile) { |
| writeLock.lock(); |
| try { |
| if (swapMode || activeQueue.size() >= swapThreshold) { |
| swapQueue.add(flowFile); |
| incrementSwapQueueSize(1, flowFile.getSize(), 0); |
| swapMode = true; |
| writeSwapFilesIfNecessary(); |
| } else { |
| incrementActiveQueueSize(1, flowFile.getSize()); |
| activeQueue.add(flowFile); |
| } |
| |
| logger.trace("{} put to {}", flowFile, this); |
| } finally { |
| writeLock.unlock("put(FlowFileRecord)"); |
| } |
| } |
| |
| public void putAll(final Collection<FlowFileRecord> flowFiles) { |
| final int numFiles = flowFiles.size(); |
| long bytes = 0L; |
| for (final FlowFile flowFile : flowFiles) { |
| bytes += flowFile.getSize(); |
| } |
| |
| writeLock.lock(); |
| try { |
| if (swapMode || activeQueue.size() >= swapThreshold - numFiles) { |
| swapQueue.addAll(flowFiles); |
| incrementSwapQueueSize(numFiles, bytes, 0); |
| swapMode = true; |
| writeSwapFilesIfNecessary(); |
| } else { |
| incrementActiveQueueSize(numFiles, bytes); |
| activeQueue.addAll(flowFiles); |
| } |
| |
| logger.trace("{} put to {}", flowFiles, this); |
| } finally { |
| writeLock.unlock("putAll"); |
| } |
| } |
| |
| public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis) { |
| return poll(expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); |
| } |
| |
| public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { |
| FlowFileRecord flowFile; |
| |
| // First check if we have any records Pre-Fetched. |
| writeLock.lock(); |
| try { |
| flowFile = doPoll(expiredRecords, expirationMillis, pollStrategy); |
| |
| if (flowFile != null) { |
| logger.trace("{} poll() returning {}", this, flowFile); |
| incrementUnacknowledgedQueueSize(1, flowFile.getSize()); |
| } |
| |
| return flowFile; |
| } finally { |
| writeLock.unlock("poll(Set)"); |
| } |
| } |
| |
| |
| private FlowFileRecord doPoll(final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { |
| FlowFileRecord flowFile; |
| boolean isExpired; |
| |
| migrateSwapToActive(); |
| |
| long expiredBytes = 0L; |
| do { |
| flowFile = this.activeQueue.poll(); |
| |
| isExpired = isExpired(flowFile, expirationMillis); |
| if (isExpired) { |
| expiredRecords.add(flowFile); |
| expiredBytes += flowFile.getSize(); |
| flowFile = null; |
| |
| if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { |
| break; |
| } |
| } else if (flowFile != null && flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { |
| this.activeQueue.add(flowFile); |
| flowFile = null; |
| break; |
| } |
| |
| if (flowFile != null) { |
| incrementActiveQueueSize(-1, -flowFile.getSize()); |
| } |
| } while (isExpired); |
| |
| if (!expiredRecords.isEmpty()) { |
| incrementActiveQueueSize(-expiredRecords.size(), -expiredBytes); |
| } |
| |
| return flowFile; |
| } |
| |
| public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) { |
| return poll(maxResults, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); |
| } |
| |
| public List<FlowFileRecord> poll(int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { |
| final List<FlowFileRecord> records = new ArrayList<>(Math.min(1, maxResults)); |
| |
| // First check if we have any records Pre-Fetched. |
| writeLock.lock(); |
| try { |
| doPoll(records, maxResults, expiredRecords, expirationMillis, pollStrategy); |
| } finally { |
| writeLock.unlock("poll(int, Set)"); |
| } |
| |
| if (!records.isEmpty()) { |
| logger.trace("{} poll() returning {}", this, records); |
| } |
| |
| return records; |
| } |
| |
| public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis) { |
| return poll(filter, expiredRecords, expirationMillis, PollStrategy.UNPENALIZED_FLOWFILES); |
| } |
| |
| public List<FlowFileRecord> poll(final FlowFileFilter filter, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { |
| long bytesPulled = 0L; |
| int flowFilesPulled = 0; |
| |
| writeLock.lock(); |
| try { |
| migrateSwapToActive(); |
| |
| final List<FlowFileRecord> selectedFlowFiles = new ArrayList<>(); |
| final List<FlowFileRecord> unselected = new ArrayList<>(); |
| |
| while (true) { |
| FlowFileRecord flowFile = this.activeQueue.poll(); |
| if (flowFile == null) { |
| break; |
| } |
| |
| final boolean isExpired = isExpired(flowFile, expirationMillis); |
| if (isExpired) { |
| expiredRecords.add(flowFile); |
| bytesPulled += flowFile.getSize(); |
| flowFilesPulled++; |
| |
| if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { |
| break; |
| } else { |
| continue; |
| } |
| } else if (flowFile.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { |
| this.activeQueue.add(flowFile); |
| break; // just stop searching because the rest are all penalized. |
| } |
| |
| final FlowFileFilterResult result = filter.filter(flowFile); |
| if (result.isAccept()) { |
| bytesPulled += flowFile.getSize(); |
| flowFilesPulled++; |
| |
| incrementUnacknowledgedQueueSize(1, flowFile.getSize()); |
| selectedFlowFiles.add(flowFile); |
| } else { |
| unselected.add(flowFile); |
| } |
| |
| if (!result.isContinue()) { |
| break; |
| } |
| } |
| |
| this.activeQueue.addAll(unselected); |
| incrementActiveQueueSize(-flowFilesPulled, -bytesPulled); |
| |
| if (!selectedFlowFiles.isEmpty()) { |
| logger.trace("{} poll() returning {}", this, selectedFlowFiles); |
| } |
| |
| return selectedFlowFiles; |
| } finally { |
| writeLock.unlock("poll(Filter, Set)"); |
| } |
| } |
| |
| private void doPoll(final List<FlowFileRecord> records, int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, final PollStrategy pollStrategy) { |
| migrateSwapToActive(); |
| |
| final long bytesDrained = drainQueue(activeQueue, records, maxResults, expiredRecords, expirationMillis, pollStrategy); |
| |
| long expiredBytes = 0L; |
| for (final FlowFileRecord record : expiredRecords) { |
| expiredBytes += record.getSize(); |
| } |
| |
| incrementActiveQueueSize(-(expiredRecords.size() + records.size()), -bytesDrained); |
| incrementUnacknowledgedQueueSize(records.size(), bytesDrained - expiredBytes); |
| } |
| |
| |
| protected boolean isExpired(final FlowFile flowFile, final long expirationMillis) { |
| return isLaterThan(getExpirationDate(flowFile, expirationMillis)); |
| } |
| |
| private boolean isLaterThan(final Long maxAge) { |
| if (maxAge == null) { |
| return false; |
| } |
| return maxAge < System.currentTimeMillis(); |
| } |
| |
| private Long getExpirationDate(final FlowFile flowFile, final long expirationMillis) { |
| if (flowFile == null) { |
| return null; |
| } |
| |
| if (expirationMillis <= 0) { |
| return null; |
| } else { |
| final long entryDate = flowFile.getEntryDate(); |
| final long expirationDate = entryDate + expirationMillis; |
| return expirationDate; |
| } |
| } |
| |
| |
| private long drainQueue(final Queue<FlowFileRecord> sourceQueue, final List<FlowFileRecord> destination, |
| int maxResults, final Set<FlowFileRecord> expiredRecords, final long expirationMillis, |
| final PollStrategy pollStrategy) { |
| long drainedSize = 0L; |
| FlowFileRecord pulled; |
| |
| while (destination.size() < maxResults && (pulled = sourceQueue.poll()) != null) { |
| if (isExpired(pulled, expirationMillis)) { |
| expiredRecords.add(pulled); |
| if (expiredRecords.size() >= MAX_EXPIRED_RECORDS_PER_ITERATION) { |
| break; |
| } |
| } else { |
| if (pulled.isPenalized() && pollStrategy == PollStrategy.UNPENALIZED_FLOWFILES) { |
| sourceQueue.add(pulled); |
| break; |
| } |
| destination.add(pulled); |
| } |
| drainedSize += pulled.getSize(); |
| } |
| return drainedSize; |
| } |
| |
| |
| public FlowFileRecord getFlowFile(final String flowFileUuid) { |
| if (flowFileUuid == null) { |
| return null; |
| } |
| |
| readLock.lock(); |
| try { |
| // read through all of the FlowFiles in the queue, looking for the FlowFile with the given ID |
| for (final FlowFileRecord flowFile : activeQueue) { |
| if (flowFileUuid.equals(flowFile.getAttribute(CoreAttributes.UUID.key()))) { |
| return flowFile; |
| } |
| } |
| } finally { |
| readLock.unlock("getFlowFile"); |
| } |
| |
| return null; |
| } |
| |
| |
| public void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { |
| final String requestIdentifier = dropRequest.getRequestIdentifier(); |
| |
| writeLock.lock(); |
| try { |
| dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES); |
| logger.debug("For DropFlowFileRequest {}, original size is {}", requestIdentifier, size()); |
| |
| try { |
| final List<FlowFileRecord> activeQueueRecords = new ArrayList<>(activeQueue); |
| |
| QueueSize droppedSize; |
| try { |
| if (dropRequest.getState() == DropFlowFileState.CANCELED) { |
| logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); |
| return; |
| } |
| |
| droppedSize = dropAction.drop(activeQueueRecords, requestor); |
| logger.debug("For DropFlowFileRequest {}, Dropped {} from active queue", requestIdentifier, droppedSize); |
| } catch (final IOException ioe) { |
| logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), ioe.toString()); |
| logger.error("", ioe); |
| |
| dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); |
| return; |
| } |
| |
| activeQueue.clear(); |
| incrementActiveQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount()); |
| dropRequest.setCurrentSize(size()); |
| dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); |
| |
| final QueueSize swapSize = getFlowFileQueueSize().swapQueueSize(); |
| logger.debug("For DropFlowFileRequest {}, Swap Queue has {} elements, Swapped Record Count = {}, Swapped Content Size = {}", |
| requestIdentifier, swapQueue.size(), swapSize.getObjectCount(), swapSize.getByteCount()); |
| if (dropRequest.getState() == DropFlowFileState.CANCELED) { |
| logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); |
| return; |
| } |
| |
| try { |
| droppedSize = dropAction.drop(swapQueue, requestor); |
| } catch (final IOException ioe) { |
| logger.error("Failed to drop the FlowFiles from queue {} due to {}", getQueueIdentifier(), ioe.toString()); |
| logger.error("", ioe); |
| |
| dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + ioe.toString()); |
| return; |
| } |
| |
| swapQueue.clear(); |
| dropRequest.setCurrentSize(size()); |
| dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); |
| swapMode = false; |
| incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), 0); |
| logger.debug("For DropFlowFileRequest {}, dropped {} from Swap Queue", requestIdentifier, droppedSize); |
| |
| final int swapFileCount = swapLocations.size(); |
| final Iterator<String> swapLocationItr = swapLocations.iterator(); |
| while (swapLocationItr.hasNext()) { |
| final String swapLocation = swapLocationItr.next(); |
| |
| SwapContents swapContents = null; |
| try { |
| if (dropRequest.getState() == DropFlowFileState.CANCELED) { |
| logger.info("Cancel requested for DropFlowFileRequest {}", requestIdentifier); |
| return; |
| } |
| |
| swapContents = swapManager.swapIn(swapLocation, flowFileQueue); |
| droppedSize = dropAction.drop(swapContents.getFlowFiles(), requestor); |
| } catch (final IncompleteSwapFileException isfe) { |
| swapContents = isfe.getPartialContents(); |
| final String warnMsg = "Failed to swap in FlowFiles from Swap File " + swapLocation + " because the file was corrupt. " |
| + "Some FlowFiles may not be dropped from the queue until NiFi is restarted."; |
| |
| logger.warn(warnMsg); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.WARNING, "Drop FlowFiles", warnMsg); |
| } |
| } catch (final IOException ioe) { |
| logger.error("Failed to swap in FlowFiles from Swap File {} in order to drop the FlowFiles for Connection {} due to {}", |
| swapLocation, getQueueIdentifier(), ioe.toString()); |
| logger.error("", ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "Drop FlowFiles", "Failed to swap in FlowFiles from Swap File " + swapLocation |
| + ". The FlowFiles contained in this Swap File will not be dropped from the queue"); |
| } |
| |
| dropRequest.setState(DropFlowFileState.FAILURE, "Failed to swap in FlowFiles from Swap File " + swapLocation + " due to " + ioe.toString()); |
| if (swapContents != null) { |
| activeQueue.addAll(swapContents.getFlowFiles()); // ensure that we don't lose the FlowFiles from our queue. |
| } |
| |
| return; |
| } |
| |
| dropRequest.setDroppedSize(dropRequest.getDroppedSize().add(droppedSize)); |
| incrementSwapQueueSize(-droppedSize.getObjectCount(), -droppedSize.getByteCount(), -1); |
| |
| dropRequest.setCurrentSize(size()); |
| swapLocationItr.remove(); |
| minQueueDateInSwapLocation.remove(swapLocation); |
| totalQueueDateInSwapLocation.remove(swapLocation); |
| logger.debug("For DropFlowFileRequest {}, dropped {} for Swap File {}", requestIdentifier, droppedSize, swapLocation); |
| } |
| |
| logger.debug("Dropped FlowFiles from {} Swap Files", swapFileCount); |
| logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}", |
| dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), getQueueIdentifier(), requestor); |
| dropRequest.setState(DropFlowFileState.COMPLETE); |
| } catch (final Exception e) { |
| logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", getQueueIdentifier(), e.toString()); |
| logger.error("", e); |
| dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.toString()); |
| } |
| } finally { |
| writeLock.unlock("Drop FlowFiles"); |
| } |
| } |
| |
| |
| |
| public SwapSummary recoverSwappedFlowFiles() { |
| int swapFlowFileCount = 0; |
| long swapByteCount = 0L; |
| long totalSwappedQueueDate = 0L; |
| Long minSwappedQueueDate = null; |
| Long maxId = null; |
| List<ResourceClaim> resourceClaims = new ArrayList<>(); |
| final long startNanos = System.nanoTime(); |
| int failures = 0; |
| |
| writeLock.lock(); |
| try { |
| final List<String> swapLocationsFromSwapManager; |
| try { |
| swapLocationsFromSwapManager = swapManager.recoverSwapLocations(flowFileQueue, swapPartitionName); |
| } catch (final IOException ioe) { |
| logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {}", getQueueIdentifier()); |
| logger.error("", ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + |
| getQueueIdentifier() + "; see logs for more detials"); |
| } |
| return null; |
| } |
| |
| // If we have a duplicate of any of the swap location that we already know about, we need to filter those out now. |
| // This can happen when, upon startup, we need to swap data out during the swap file recovery. In this case, we do |
| // not want to include such a swap file in those that we recover, because those have already been accounted for when |
| // they were added to the queue, before being swapped out. |
| final Set<String> swapLocations = new LinkedHashSet<>(swapLocationsFromSwapManager); |
| swapLocations.removeAll(this.swapLocations); |
| |
| logger.debug("Swap Manager reports {} Swap Files for {}: {}", swapLocations.size(), flowFileQueue, swapLocations); |
| for (final String swapLocation : swapLocations) { |
| try { |
| final SwapSummary summary = swapManager.getSwapSummary(swapLocation); |
| final QueueSize queueSize = summary.getQueueSize(); |
| final Long maxSwapRecordId = summary.getMaxFlowFileId(); |
| if (maxSwapRecordId != null) { |
| if (maxId == null || maxSwapRecordId > maxId) { |
| maxId = maxSwapRecordId; |
| } |
| } |
| |
| swapFlowFileCount += queueSize.getObjectCount(); |
| swapByteCount += queueSize.getByteCount(); |
| resourceClaims.addAll(summary.getResourceClaims()); |
| |
| // Update class member metrics |
| minQueueDateInSwapLocation.put(swapLocation, summary.getMinLastQueueDate()); |
| totalQueueDateInSwapLocation.put(swapLocation, summary.getTotalLastQueueDate()); |
| |
| // Update metrics for this method's return value |
| if(minSwappedQueueDate == null) { |
| minSwappedQueueDate = summary.getMinLastQueueDate(); |
| } else { |
| if(summary.getMinLastQueueDate() != null) { |
| minSwappedQueueDate = Long.min(minSwappedQueueDate, summary.getMinLastQueueDate()); |
| } |
| } |
| totalSwappedQueueDate += summary.getTotalLastQueueDate(); |
| } catch (final IOException ioe) { |
| failures++; |
| logger.error("Failed to recover FlowFiles from Swap File {}; the file appears to be corrupt", swapLocation); |
| logger.error("", ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to recover FlowFiles from Swap File " + swapLocation + |
| "; the file appears to be corrupt. See logs for more details"); |
| } |
| } |
| } |
| |
| incrementSwapQueueSize(swapFlowFileCount, swapByteCount, swapLocations.size()); |
| this.swapLocations.addAll(swapLocations); |
| } finally { |
| writeLock.unlock("Recover Swap Files"); |
| } |
| |
| if (swapLocations.isEmpty()) { |
| logger.debug("No swap files were recovered for {}", flowFileQueue); |
| } else { |
| final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); |
| logger.info("Recovered {} swap files for {} in {} millis", swapLocations.size() - failures, this, millis); |
| } |
| |
| // minSwappedQueueDate and totalSwappedQueueDate within this particular StandardSwapSummary are not ultimately used by the FlowController. However, |
| // it can't hurt to set them here accurately in case they ever are. |
| return new StandardSwapSummary(new QueueSize(swapFlowFileCount, swapByteCount), maxId, resourceClaims, minSwappedQueueDate, totalSwappedQueueDate); |
| } |
| |
| public long getMinLastQueueDate() { |
| readLock.lock(); |
| try { |
| // We want the oldest timestamp, which will be the min |
| long min = getMinLastQueueDate(activeQueue, 0L); |
| min = Long.min(min, getMinLastQueueDate(swapQueue, min)); |
| |
| for(Long minSwapQueueDate: minQueueDateInSwapLocation.values()) { |
| min = min == 0 ? minSwapQueueDate : Long.min(min, minSwapQueueDate); |
| } |
| |
| return min; |
| } finally { |
| readLock.unlock("Get Min Last Queue Date"); |
| } |
| } |
| |
| private long getMinLastQueueDate(Iterable<FlowFileRecord> iterable, long defaultMin) { |
| long min = 0; |
| |
| for (FlowFileRecord flowFileRecord : iterable) { |
| min = min == 0 ? flowFileRecord.getLastQueueDate() : Long.min(flowFileRecord.getLastQueueDate(), min); |
| } |
| |
| return min == 0 ? defaultMin : min; |
| } |
| |
| public long getTotalQueuedDuration(long fromTimestamp) { |
| readLock.lock(); |
| try { |
| long sum = 0L; |
| for (FlowFileRecord flowFileRecord : activeQueue) { |
| sum += (fromTimestamp - flowFileRecord.getLastQueueDate()); |
| } |
| |
| for (FlowFileRecord flowFileRecord : swapQueue) { |
| sum += (fromTimestamp - flowFileRecord.getLastQueueDate()); |
| } |
| |
| long totalSwappedQueueDate = 0L; |
| for(Long totalQueueDate: totalQueueDateInSwapLocation.values()) { |
| totalSwappedQueueDate += totalQueueDate; |
| } |
| |
| // We are only considering FlowFiles that have been swapped to disk in this calculation since we took care of the |
| // in-memory swapQueue previously. |
| sum += ((getFlowFileQueueSize().getSwappedCount() - swapQueue.size()) * fromTimestamp) - totalSwappedQueueDate; |
| return sum; |
| } finally { |
| readLock.unlock("Get Total Queued Duration"); |
| } |
| } |
| |
| protected void incrementActiveQueueSize(final int count, final long bytes) { |
| boolean updated = false; |
| while (!updated) { |
| final FlowFileQueueSize original = size.get(); |
| final FlowFileQueueSize newSize = new FlowFileQueueSize( |
| original.getActiveCount() + count, original.getActiveBytes() + bytes, |
| original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), |
| original.getUnacknowledgedCount(), original.getUnacknowledgedBytes()); |
| |
| updated = updateSize(original, newSize); |
| |
| if (updated) { |
| logIfNegative(original, newSize, "active"); |
| } |
| } |
| } |
| |
| private void incrementSwapQueueSize(final int count, final long bytes, final int fileCount) { |
| boolean updated = false; |
| while (!updated) { |
| final FlowFileQueueSize original = getFlowFileQueueSize(); |
| final FlowFileQueueSize newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(), |
| original.getSwappedCount() + count, original.getSwappedBytes() + bytes, original.getSwapFileCount() + fileCount, |
| original.getUnacknowledgedCount(), original.getUnacknowledgedBytes()); |
| |
| updated = updateSize(original, newSize); |
| if (updated) { |
| logIfNegative(original, newSize, "swap"); |
| } |
| } |
| } |
| |
| private void incrementUnacknowledgedQueueSize(final int count, final long bytes) { |
| boolean updated = false; |
| while (!updated) { |
| final FlowFileQueueSize original = size.get(); |
| final FlowFileQueueSize newSize = new FlowFileQueueSize(original.getActiveCount(), original.getActiveBytes(), |
| original.getSwappedCount(), original.getSwappedBytes(), original.getSwapFileCount(), |
| original.getUnacknowledgedCount() + count, original.getUnacknowledgedBytes() + bytes); |
| |
| updated = updateSize(original, newSize); |
| |
| if (updated) { |
| logIfNegative(original, newSize, "Unacknowledged"); |
| } |
| } |
| } |
| |
| private void logIfNegative(final FlowFileQueueSize original, final FlowFileQueueSize newSize, final String counterName) { |
| if (newSize.getActiveBytes() < 0 || newSize.getActiveCount() < 0 |
| || newSize.getSwappedBytes() < 0 || newSize.getSwappedCount() < 0 |
| || newSize.getUnacknowledgedBytes() < 0 || newSize.getUnacknowledgedCount() < 0) { |
| |
| logger.error("Updated Size of Queue " + counterName + " from " + original + " to " + newSize, new RuntimeException("Cannot create negative queue size")); |
| } |
| } |
| |
| |
| protected boolean updateSize(final FlowFileQueueSize expected, final FlowFileQueueSize updated) { |
| return size.compareAndSet(expected, updated); |
| } |
| |
| public FlowFileQueueSize getFlowFileQueueSize() { |
| return size.get(); |
| } |
| |
| public void inheritQueueContents(final FlowFileQueueContents queueContents) { |
| writeLock.lock(); |
| try { |
| putAll(queueContents.getActiveFlowFiles()); |
| |
| final List<String> inheritedSwapLocations = queueContents.getSwapLocations(); |
| swapLocations.addAll(inheritedSwapLocations); |
| incrementSwapQueueSize(queueContents.getSwapSize().getObjectCount(), queueContents.getSwapSize().getByteCount(), queueContents.getSwapLocations().size()); |
| |
| if (!inheritedSwapLocations.isEmpty()) { |
| logger.debug("Inherited the following swap locations: {}", inheritedSwapLocations); |
| } |
| } finally { |
| writeLock.unlock("inheritQueueContents"); |
| } |
| } |
| |
| public FlowFileQueueContents packageForRebalance(final String newPartitionName) { |
| writeLock.lock(); |
| try { |
| final List<FlowFileRecord> activeRecords = new ArrayList<>(this.activeQueue); |
| |
| final List<String> updatedSwapLocations = new ArrayList<>(swapLocations.size()); |
| for (final String swapLocation : swapLocations) { |
| try { |
| final String updatedSwapLocation = swapManager.changePartitionName(swapLocation, newPartitionName); |
| updatedSwapLocations.add(updatedSwapLocation); |
| } catch (final IOException ioe) { |
| logger.error("Failed to update Swap File {} to reflect that the contents are now owned by Partition '{}'", swapLocation, newPartitionName, ioe); |
| } |
| } |
| |
| this.swapLocations.clear(); |
| this.activeQueue.clear(); |
| |
| final int swapQueueCount = swapQueue.size(); |
| final long swapQueueBytes = swapQueue.stream().mapToLong(FlowFileRecord::getSize).sum(); |
| activeRecords.addAll(swapQueue); |
| swapQueue.clear(); |
| |
| this.swapMode = false; |
| |
| QueueSize swapSize; |
| boolean updated; |
| do { |
| final FlowFileQueueSize currentSize = getFlowFileQueueSize(); |
| swapSize = new QueueSize(currentSize.getSwappedCount() - swapQueueCount, currentSize.getSwappedBytes() - swapQueueBytes); |
| |
| final FlowFileQueueSize updatedSize = new FlowFileQueueSize(0, 0, 0, 0, 0, currentSize.getUnacknowledgedCount(), currentSize.getUnacknowledgedBytes()); |
| updated = updateSize(currentSize, updatedSize); |
| } while (!updated); |
| |
| logger.debug("Cleared {} to package FlowFile for rebalance to {}", this, newPartitionName); |
| return new FlowFileQueueContents(activeRecords, updatedSwapLocations, swapSize); |
| } finally { |
| writeLock.unlock("packageForRebalance(SwappablePriorityQueue)"); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "SwappablePriorityQueue[queueId=" + flowFileQueue.getIdentifier() + ", partition=" + swapPartitionName + "]"; |
| } |
| } |