| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.controller.queue.clustered; |
| |
| import org.apache.nifi.cluster.coordination.ClusterCoordinator; |
| import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener; |
| import org.apache.nifi.cluster.coordination.node.NodeConnectionState; |
| import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus; |
| import org.apache.nifi.cluster.protocol.NodeIdentifier; |
| import org.apache.nifi.controller.ProcessScheduler; |
| import org.apache.nifi.controller.queue.AbstractFlowFileQueue; |
| import org.apache.nifi.controller.queue.ConnectionEventListener; |
| import org.apache.nifi.controller.queue.DropFlowFileRequest; |
| import org.apache.nifi.controller.queue.DropFlowFileState; |
| import org.apache.nifi.controller.queue.FlowFileQueueContents; |
| import org.apache.nifi.controller.queue.IllegalClusterStateException; |
| import org.apache.nifi.controller.queue.LoadBalanceStrategy; |
| import org.apache.nifi.controller.queue.LoadBalancedFlowFileQueue; |
| import org.apache.nifi.controller.queue.LocalQueuePartitionDiagnostics; |
| import org.apache.nifi.controller.queue.PollStrategy; |
| import org.apache.nifi.controller.queue.QueueDiagnostics; |
| import org.apache.nifi.controller.queue.QueueSize; |
| import org.apache.nifi.controller.queue.RemoteQueuePartitionDiagnostics; |
| import org.apache.nifi.controller.queue.StandardQueueDiagnostics; |
| import org.apache.nifi.controller.queue.SwappablePriorityQueue; |
| import org.apache.nifi.controller.queue.clustered.client.async.AsyncLoadBalanceClientRegistry; |
| import org.apache.nifi.controller.queue.clustered.partition.CorrelationAttributePartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.FirstNodePartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.FlowFilePartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.LocalPartitionPartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.LocalQueuePartition; |
| import org.apache.nifi.controller.queue.clustered.partition.NonLocalPartitionPartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.QueuePartition; |
| import org.apache.nifi.controller.queue.clustered.partition.RebalancingPartition; |
| import org.apache.nifi.controller.queue.clustered.partition.RemoteQueuePartition; |
| import org.apache.nifi.controller.queue.clustered.partition.RoundRobinPartitioner; |
| import org.apache.nifi.controller.queue.clustered.partition.StandardRebalancingPartition; |
| import org.apache.nifi.controller.queue.clustered.partition.SwappablePriorityQueueLocalPartition; |
| import org.apache.nifi.controller.repository.ContentRepository; |
| import org.apache.nifi.controller.repository.FlowFileRecord; |
| import org.apache.nifi.controller.repository.FlowFileRepository; |
| import org.apache.nifi.controller.repository.FlowFileSwapManager; |
| import org.apache.nifi.controller.repository.RepositoryRecord; |
| import org.apache.nifi.controller.repository.StandardRepositoryRecord; |
| import org.apache.nifi.controller.repository.SwapSummary; |
| import org.apache.nifi.controller.repository.claim.ContentClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaim; |
| import org.apache.nifi.controller.repository.claim.ResourceClaimManager; |
| import org.apache.nifi.controller.swap.StandardSwapSummary; |
| import org.apache.nifi.events.EventReporter; |
| import org.apache.nifi.flowfile.FlowFilePrioritizer; |
| import org.apache.nifi.processor.FlowFileFilter; |
| import org.apache.nifi.provenance.ProvenanceEventBuilder; |
| import org.apache.nifi.provenance.ProvenanceEventRecord; |
| import org.apache.nifi.provenance.ProvenanceEventRepository; |
| import org.apache.nifi.provenance.ProvenanceEventType; |
| import org.apache.nifi.provenance.StandardProvenanceEventRecord; |
| import org.apache.nifi.reporting.Severity; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| |
| public class SocketLoadBalancedFlowFileQueue extends AbstractFlowFileQueue implements LoadBalancedFlowFileQueue { |
| private static final Logger logger = LoggerFactory.getLogger(SocketLoadBalancedFlowFileQueue.class); |
| private static final int NODE_SWAP_THRESHOLD = 1000; |
| private static final Comparator<NodeIdentifier> loadBalanceEndpointComparator = |
| Comparator.comparing(NodeIdentifier::getLoadBalanceAddress) |
| .thenComparing(NodeIdentifier::getLoadBalancePort); |
| |
| private final List<FlowFilePrioritizer> prioritizers = new ArrayList<>(); |
| private final ConnectionEventListener eventListener; |
| private final AtomicReference<QueueSize> totalSize = new AtomicReference<>(new QueueSize(0, 0L)); |
| private final LocalQueuePartition localPartition; |
| private final RebalancingPartition rebalancingPartition; |
| private final FlowFileSwapManager swapManager; |
| private final EventReporter eventReporter; |
| private final ClusterCoordinator clusterCoordinator; |
| private final AsyncLoadBalanceClientRegistry clientRegistry; |
| |
| private final FlowFileRepository flowFileRepo; |
| private final ProvenanceEventRepository provRepo; |
| private final ContentRepository contentRepo; |
| private final Set<NodeIdentifier> nodeIdentifiers; |
| |
| private final ReadWriteLock partitionLock = new ReentrantReadWriteLock(); |
| private final Lock partitionReadLock = partitionLock.readLock(); |
| private final Lock partitionWriteLock = partitionLock.writeLock(); |
| private QueuePartition[] queuePartitions; |
| private volatile FlowFilePartitioner partitioner; |
| private boolean stopped = true; |
| private volatile boolean offloaded = false; |
| |
| |
| public SocketLoadBalancedFlowFileQueue(final String identifier, final ConnectionEventListener eventListener, final ProcessScheduler scheduler, final FlowFileRepository flowFileRepo, |
| final ProvenanceEventRepository provRepo, final ContentRepository contentRepo, final ResourceClaimManager resourceClaimManager, |
| final ClusterCoordinator clusterCoordinator, final AsyncLoadBalanceClientRegistry clientRegistry, final FlowFileSwapManager swapManager, |
| final int swapThreshold, final EventReporter eventReporter) { |
| |
| super(identifier, scheduler, flowFileRepo, provRepo, resourceClaimManager); |
| this.eventListener = eventListener; |
| this.eventReporter = eventReporter; |
| this.swapManager = swapManager; |
| this.flowFileRepo = flowFileRepo; |
| this.provRepo = provRepo; |
| this.contentRepo = contentRepo; |
| this.clusterCoordinator = clusterCoordinator; |
| this.clientRegistry = clientRegistry; |
| |
| localPartition = new SwappablePriorityQueueLocalPartition(swapManager, swapThreshold, eventReporter, this, this::drop); |
| rebalancingPartition = new StandardRebalancingPartition(swapManager, swapThreshold, eventReporter, this, this::drop); |
| |
| // Create a RemoteQueuePartition for each node |
| nodeIdentifiers = clusterCoordinator == null ? Collections.emptySet() : new TreeSet<>(loadBalanceEndpointComparator); |
| |
| if (clusterCoordinator != null) { |
| nodeIdentifiers.addAll(clusterCoordinator.getNodeIdentifiers()); |
| } |
| |
| final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(nodeIdentifiers); |
| sortedNodeIdentifiers.sort(Comparator.comparing(NodeIdentifier::getApiAddress)); |
| |
| if (sortedNodeIdentifiers.isEmpty()) { |
| // No Node Identifiers are known yet. Just create the partitions using the local partition. |
| queuePartitions = new QueuePartition[] { localPartition }; |
| } else { |
| // The node identifiers are known. Create the partitions using the local partition and 1 Remote Partition for each node |
| // that is not the local node identifier. If the Local Node Identifier is not yet known, that's okay. When it becomes known, |
| // the queuePartitions array will be recreated with the appropriate partitions. |
| final List<QueuePartition> partitionList = new ArrayList<>(); |
| |
| final NodeIdentifier localNodeId = clusterCoordinator.getLocalNodeIdentifier(); |
| for (final NodeIdentifier nodeId : sortedNodeIdentifiers) { |
| if (nodeId.equals(localNodeId)) { |
| partitionList.add(localPartition); |
| } else { |
| partitionList.add(createRemotePartition(nodeId)); |
| } |
| } |
| |
| // Ensure that our list of queue partitions always contains the local partition. |
| if (!partitionList.contains(localPartition)) { |
| partitionList.add(localPartition); |
| } |
| |
| queuePartitions = partitionList.toArray(new QueuePartition[0]); |
| } |
| |
| partitioner = new LocalPartitionPartitioner(); |
| |
| if (clusterCoordinator != null) { |
| clusterCoordinator.registerEventListener(new ClusterEventListener()); |
| } |
| |
| rebalancingPartition.start(partitioner); |
| } |
| |
| |
| @Override |
| public synchronized void setLoadBalanceStrategy(final LoadBalanceStrategy strategy, final String partitioningAttribute) { |
| final LoadBalanceStrategy currentStrategy = getLoadBalanceStrategy(); |
| final String currentPartitioningAttribute = getPartitioningAttribute(); |
| |
| super.setLoadBalanceStrategy(strategy, partitioningAttribute); |
| |
| if (strategy == currentStrategy && Objects.equals(partitioningAttribute, currentPartitioningAttribute)) { |
| // Nothing changed. |
| return; |
| } |
| |
| if (clusterCoordinator == null) { |
| // Not clustered so nothing to worry about. |
| return; |
| } |
| |
| if (!offloaded) { |
| // We are already load balancing but are changing how we are load balancing. |
| final FlowFilePartitioner partitioner; |
| partitioner = getPartitionerForLoadBalancingStrategy(strategy, partitioningAttribute); |
| |
| setFlowFilePartitioner(partitioner); |
| } |
| } |
| |
| private FlowFilePartitioner getPartitionerForLoadBalancingStrategy(LoadBalanceStrategy strategy, String partitioningAttribute) { |
| FlowFilePartitioner partitioner; |
| switch (strategy) { |
| case DO_NOT_LOAD_BALANCE: |
| partitioner = new LocalPartitionPartitioner(); |
| break; |
| case PARTITION_BY_ATTRIBUTE: |
| partitioner = new CorrelationAttributePartitioner(partitioningAttribute); |
| break; |
| case ROUND_ROBIN: |
| partitioner = new RoundRobinPartitioner(); |
| break; |
| case SINGLE_NODE: |
| partitioner = new FirstNodePartitioner(); |
| break; |
| default: |
| throw new IllegalArgumentException(); |
| } |
| return partitioner; |
| } |
| |
| @Override |
| public void offloadQueue() { |
| if (clusterCoordinator == null) { |
| // Not clustered, cannot offload the queue to other nodes |
| return; |
| } |
| |
| logger.debug("Setting queue {} on node {} as offloaded", this, clusterCoordinator.getLocalNodeIdentifier()); |
| offloaded = true; |
| |
| partitionWriteLock.lock(); |
| try { |
| final Set<NodeIdentifier> nodesToKeep = new HashSet<>(); |
| |
| // If we have any nodes that are connected, we only want to send data to the connected nodes. |
| for (final QueuePartition partition : queuePartitions) { |
| final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier(); |
| if (!nodeIdOption.isPresent()) { |
| continue; |
| } |
| |
| final NodeIdentifier nodeId = nodeIdOption.get(); |
| final NodeConnectionStatus status = clusterCoordinator.getConnectionStatus(nodeId); |
| if (status != null && status.getState() == NodeConnectionState.CONNECTED) { |
| nodesToKeep.add(nodeId); |
| } |
| } |
| |
| if (!nodesToKeep.isEmpty()) { |
| setNodeIdentifiers(nodesToKeep, false); |
| } |
| |
| // Update our partitioner so that we don't keep any data on the local partition |
| setFlowFilePartitioner(new NonLocalPartitionPartitioner()); |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void resetOffloadedQueue() { |
| if (clusterCoordinator == null) { |
| // Not clustered, was not offloading the queue to other nodes |
| return; |
| } |
| |
| if (offloaded) { |
| // queue was offloaded previously, allow files to be added to the local partition |
| offloaded = false; |
| logger.debug("Queue {} on node {} was previously offloaded, resetting offloaded status to {}", |
| this, clusterCoordinator.getLocalNodeIdentifier(), offloaded); |
| // reset the partitioner based on the load balancing strategy, since offloading previously changed the partitioner |
| FlowFilePartitioner partitioner = getPartitionerForLoadBalancingStrategy(getLoadBalanceStrategy(), getPartitioningAttribute()); |
| setFlowFilePartitioner(partitioner); |
| logger.debug("Queue {} is no longer offloaded, restored load balance strategy to {} and partitioning attribute to \"{}\"", |
| this, getLoadBalanceStrategy(), getPartitioningAttribute()); |
| } |
| } |
| |
| public synchronized void startLoadBalancing() { |
| logger.debug("{} started. Will begin distributing FlowFiles across the cluster", this); |
| |
| if (!stopped) { |
| return; |
| } |
| |
| stopped = false; |
| |
| partitionReadLock.lock(); |
| try { |
| rebalancingPartition.start(partitioner); |
| |
| for (final QueuePartition queuePartition : queuePartitions) { |
| queuePartition.start(partitioner); |
| } |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| public synchronized void stopLoadBalancing() { |
| logger.debug("{} stopped. Will no longer distribute FlowFiles across the cluster", this); |
| |
| if (stopped) { |
| return; |
| } |
| |
| stopped = true; |
| |
| partitionReadLock.lock(); |
| try { |
| rebalancingPartition.stop(); |
| for (final QueuePartition queuePartition : queuePartitions) { |
| queuePartition.stop(); |
| } |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| @Override |
| public boolean isActivelyLoadBalancing() { |
| final QueueSize size = size(); |
| if (size.getObjectCount() == 0) { |
| return false; |
| } |
| |
| final int localObjectCount = localPartition.size().getObjectCount(); |
| return (size.getObjectCount() > localObjectCount); |
| } |
| |
| private QueuePartition createRemotePartition(final NodeIdentifier nodeId) { |
| final SwappablePriorityQueue partitionQueue = new SwappablePriorityQueue(swapManager, NODE_SWAP_THRESHOLD, eventReporter, this, this::drop, nodeId.getId()); |
| |
| final TransferFailureDestination failureDestination = new TransferFailureDestination() { |
| @Override |
| public void putAll(final Collection<FlowFileRecord> flowFiles, final FlowFilePartitioner partitionerUsed) { |
| if (flowFiles.isEmpty()) { |
| return; |
| } |
| |
| if (isRebalanceOnFailure(partitionerUsed)) { |
| logger.debug("Transferring {} FlowFiles to Rebalancing Partition from node {}", flowFiles.size(), nodeId); |
| rebalancingPartition.rebalance(flowFiles); |
| } else { |
| logger.debug("Returning {} FlowFiles to their queue for node {} because Partitioner {} indicates that the FlowFiles should stay where they are", |
| flowFiles.size(), nodeId, partitionerUsed); |
| partitionQueue.putAll(flowFiles); |
| } |
| } |
| |
| @Override |
| public void putAll(final Function<String, FlowFileQueueContents> queueContentsFunction, final FlowFilePartitioner partitionerUsed) { |
| if (isRebalanceOnFailure(partitionerUsed)) { |
| final FlowFileQueueContents contents = queueContentsFunction.apply(rebalancingPartition.getSwapPartitionName()); |
| rebalancingPartition.rebalance(contents); |
| logger.debug("Transferring all {} FlowFiles and {} Swap Files queued for node {} to Rebalancing Partition", |
| contents.getActiveFlowFiles().size(), contents.getSwapLocations().size(), nodeId); |
| } else { |
| logger.debug("Will not transfer FlowFiles queued for node {} to Rebalancing Partition because Partitioner {} indicates that the FlowFiles should stay where they are", |
| nodeId, partitionerUsed); |
| } |
| } |
| |
| @Override |
| public boolean isRebalanceOnFailure(final FlowFilePartitioner partitionerUsed) { |
| return partitionerUsed.isRebalanceOnFailure() || !partitionerUsed.equals(partitioner); |
| } |
| }; |
| |
| final QueuePartition partition = new RemoteQueuePartition(nodeId, partitionQueue, failureDestination, flowFileRepo, provRepo, contentRepo, clientRegistry, this); |
| |
| if (!stopped) { |
| partition.start(partitioner); |
| } |
| |
| return partition; |
| } |
| |
| @Override |
| public synchronized List<FlowFilePrioritizer> getPriorities() { |
| return new ArrayList<>(prioritizers); |
| } |
| |
| @Override |
| public synchronized void setPriorities(final List<FlowFilePrioritizer> newPriorities) { |
| prioritizers.clear(); |
| prioritizers.addAll(newPriorities); |
| |
| partitionReadLock.lock(); |
| try { |
| for (final QueuePartition partition : queuePartitions) { |
| partition.setPriorities(newPriorities); |
| } |
| |
| rebalancingPartition.setPriorities(newPriorities); |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| |
| @Override |
| public SwapSummary recoverSwappedFlowFiles() { |
| partitionReadLock.lock(); |
| try { |
| final List<SwapSummary> summaries = new ArrayList<>(queuePartitions.length); |
| |
| // Discover the names of all partitions that have data swapped out. |
| Set<String> partitionNamesToRecover; |
| try { |
| partitionNamesToRecover = swapManager.getSwappedPartitionNames(this); |
| logger.debug("For {}, partition names to recover are {}", this, partitionNamesToRecover); |
| } catch (final IOException ioe) { |
| logger.error("Failed to determine the names of the Partitions that have swapped FlowFiles for queue with ID {}.", getIdentifier(), ioe); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine the names of Partitions that have swapped FlowFiles for queue with ID " + |
| getIdentifier() + "; see logs for more detials"); |
| } |
| |
| partitionNamesToRecover = Collections.emptySet(); |
| } |
| |
| // For each Queue Partition, recover swapped FlowFiles. |
| for (final QueuePartition partition : queuePartitions) { |
| partitionNamesToRecover.remove(partition.getSwapPartitionName()); |
| |
| final SwapSummary summary = partition.recoverSwappedFlowFiles(); |
| summaries.add(summary); |
| } |
| |
| // Recover any swap files that may belong to the 'rebalancing' partition |
| partitionNamesToRecover.remove(rebalancingPartition.getSwapPartitionName()); |
| final SwapSummary rebalancingSwapSummary = rebalancingPartition.recoverSwappedFlowFiles(); |
| summaries.add(rebalancingSwapSummary); |
| |
| // If there is any Partition that has swapped FlowFiles but for which we don't have a Queue Partition created, we need to recover those swap locations |
| // and get their swap summaries now. We then transfer any Swap Files that existed for that partition to the 'rebalancing' partition so that the data |
| // will be rebalanced against the existing partitions. We do this to handle the following scenario: |
| // - NiFi is running in a cluster with 5 nodes. |
| // - A queue is load balanced across the cluster, with all partitions having data swapped out. |
| // - NiFi is shutdown and upgraded to a new version. |
| // - Admin failed to copy over the Managed State for the nodes from the old version to the new version. |
| // - Upon restart, NiFi does not know about any of the nodes in the cluster. |
| // - When a node joins and recovers swap locations, it is the only known node. |
| // - NiFi will not know that it needs a Remote Partition for nodes 2-5. |
| // - If we don't recover those partitions here, then we'll end up not recovering the Swap Files at all, which will result in the Content Claims |
| // have their Claimant Counts decremented, which could lead to loss of the data from the Content Repository. |
| for (final String partitionName : partitionNamesToRecover) { |
| logger.info("Found Swap Files for FlowFile Queue with Identifier {} and Partition {} that has not been recovered yet. " |
| + "Will recover Swap Files for this Partition even though no partition exists with this name yet", getIdentifier(), partitionName); |
| |
| try { |
| final List<String> swapLocations = swapManager.recoverSwapLocations(this, partitionName); |
| for (final String swapLocation : swapLocations) { |
| final SwapSummary swapSummary = swapManager.getSwapSummary(swapLocation); |
| summaries.add(swapSummary); |
| |
| // Transfer the swap file to the rebalancing partition. |
| final String updatedSwapLocation = swapManager.changePartitionName(swapLocation, rebalancingPartition.getSwapPartitionName()); |
| final FlowFileQueueContents queueContents = new FlowFileQueueContents(Collections.emptyList(), Collections.singletonList(updatedSwapLocation), swapSummary.getQueueSize()); |
| rebalancingPartition.rebalance(queueContents); |
| } |
| } catch (IOException e) { |
| logger.error("Failed to determine whether or not any Swap Files exist for FlowFile Queue {} and Partition {}", getIdentifier(), partitionName, e); |
| if (eventReporter != null) { |
| eventReporter.reportEvent(Severity.ERROR, "FlowFile Swapping", "Failed to determine whether or not any Swap Files exist for FlowFile Queue " + |
| getIdentifier() + "; see logs for more detials"); |
| } |
| } |
| } |
| |
| Long maxId = null; |
| QueueSize totalQueueSize = new QueueSize(0, 0L); |
| final List<ResourceClaim> resourceClaims = new ArrayList<>(); |
| Long minLastQueueDate = null; |
| long totalLastQueueDate = 0L; |
| |
| for (final SwapSummary summary : summaries) { |
| Long summaryMaxId = summary.getMaxFlowFileId(); |
| if (summaryMaxId != null && (maxId == null || summaryMaxId > maxId)) { |
| maxId = summaryMaxId; |
| } |
| |
| final QueueSize summaryQueueSize = summary.getQueueSize(); |
| totalQueueSize = totalQueueSize.add(summaryQueueSize); |
| |
| final List<ResourceClaim> summaryResourceClaims = summary.getResourceClaims(); |
| resourceClaims.addAll(summaryResourceClaims); |
| |
| if(minLastQueueDate == null) { |
| minLastQueueDate = summary.getMinLastQueueDate(); |
| } else { |
| if(summary.getMinLastQueueDate() != null) { |
| minLastQueueDate = Long.min(minLastQueueDate, summary.getMinLastQueueDate()); |
| } |
| } |
| |
| totalLastQueueDate += summary.getTotalLastQueueDate(); |
| } |
| |
| adjustSize(totalQueueSize.getObjectCount(), totalQueueSize.getByteCount()); |
| |
| return new StandardSwapSummary(totalQueueSize, maxId, resourceClaims, minLastQueueDate, totalLastQueueDate); |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void purgeSwapFiles() { |
| swapManager.purge(); |
| } |
| |
| @Override |
| public QueueSize size() { |
| return totalSize.get(); |
| } |
| |
| @Override |
| public long getTotalQueuedDuration(long fromTimestamp) { |
| long sum = 0L; |
| for (QueuePartition queuePartition : queuePartitions) { |
| long totalActiveQueuedDuration = queuePartition.getTotalActiveQueuedDuration(fromTimestamp); |
| sum += totalActiveQueuedDuration; |
| } |
| return sum; |
| } |
| |
| @Override |
| public long getMinLastQueueDate() { |
| long min = 0; |
| for (QueuePartition queuePartition : queuePartitions) { |
| min = min == 0 ? queuePartition.getMinLastQueueDate() : Long.min(min, queuePartition.getMinLastQueueDate()); |
| } |
| return min; |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return size().getObjectCount() == 0; |
| } |
| |
| @Override |
| public boolean isActiveQueueEmpty() { |
| return localPartition.isActiveQueueEmpty(); |
| } |
| |
| @Override |
| public QueueDiagnostics getQueueDiagnostics() { |
| partitionReadLock.lock(); |
| try { |
| final LocalQueuePartitionDiagnostics localDiagnostics = localPartition.getQueueDiagnostics(); |
| |
| final List<RemoteQueuePartitionDiagnostics> remoteDiagnostics = new ArrayList<>(queuePartitions.length - 1); |
| |
| for (final QueuePartition partition : queuePartitions) { |
| if (partition instanceof RemoteQueuePartition) { |
| final RemoteQueuePartition queuePartition = (RemoteQueuePartition) partition; |
| final RemoteQueuePartitionDiagnostics diagnostics = queuePartition.getDiagnostics(); |
| remoteDiagnostics.add(diagnostics); |
| } |
| } |
| |
| return new StandardQueueDiagnostics(localDiagnostics, remoteDiagnostics); |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| protected LocalQueuePartition getLocalPartition() { |
| return localPartition; |
| } |
| |
| protected int getPartitionCount() { |
| partitionReadLock.lock(); |
| try { |
| return queuePartitions.length; |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| protected QueuePartition getPartition(final int index) { |
| partitionReadLock.lock(); |
| try { |
| if (index < 0 || index >= queuePartitions.length) { |
| throw new IndexOutOfBoundsException(); |
| } |
| |
| return queuePartitions[index]; |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| private void adjustSize(final int countToAdd, final long bytesToAdd) { |
| boolean updated = false; |
| while (!updated) { |
| final QueueSize queueSize = this.totalSize.get(); |
| final QueueSize updatedSize = queueSize.add(countToAdd, bytesToAdd); |
| updated = totalSize.compareAndSet(queueSize, updatedSize); |
| } |
| } |
| |
| public void onTransfer(final Collection<FlowFileRecord> flowFiles) { |
| adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); |
| } |
| |
| public void onAbort(final Collection<FlowFileRecord> flowFiles) { |
| if (flowFiles == null || flowFiles.isEmpty()) { |
| return; |
| } |
| |
| adjustSize(-flowFiles.size(), -flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); |
| } |
| |
| @Override |
| public boolean isLocalPartitionFull() { |
| return isFull(localPartition.size()); |
| } |
| |
| /** |
| * Determines which QueuePartition the given FlowFile belongs to. Must be called with partition read lock held. |
| * |
| * @param flowFile the FlowFile |
| * @return the QueuePartition that the FlowFile belongs to |
| */ |
| private QueuePartition getPartition(final FlowFileRecord flowFile) { |
| final QueuePartition queuePartition = partitioner.getPartition(flowFile, queuePartitions, localPartition); |
| logger.debug("{} Assigning {} to Partition: {}", this, flowFile, queuePartition); |
| return queuePartition; |
| } |
| |
| public void setNodeIdentifiers(final Set<NodeIdentifier> updatedNodeIdentifiers, final boolean forceUpdate) { |
| partitionWriteLock.lock(); |
| try { |
| // If nothing is changing, then just return |
| if (!forceUpdate && this.nodeIdentifiers.equals(updatedNodeIdentifiers)) { |
| logger.debug("{} Not going to rebalance Queue even though setNodeIdentifiers was called, because the new set of Node Identifiers is the same as the existing set", this); |
| return; |
| } |
| |
| logger.debug("{} Stopping the {} queue partitions in order to change node identifiers from {} to {}", this, queuePartitions.length, this.nodeIdentifiers, updatedNodeIdentifiers); |
| for (final QueuePartition queuePartition : queuePartitions) { |
| queuePartition.stop(); |
| } |
| |
| // Determine which Node Identifiers, if any, were removed. |
| final Set<NodeIdentifier> removedNodeIds = new TreeSet<>(loadBalanceEndpointComparator); |
| removedNodeIds.addAll(this.nodeIdentifiers); |
| removedNodeIds.removeAll(updatedNodeIdentifiers); |
| logger.debug("{} The following Node Identifiers were removed from the cluster: {}", this, removedNodeIds); |
| |
| final Function<NodeIdentifier, String> mapKeyTransform = nodeId -> nodeId.getLoadBalanceAddress() + ":" + nodeId.getLoadBalancePort(); |
| |
| // Build up a Map of Node ID to Queue Partition so that we can easily pull over the existing |
| // QueuePartition objects instead of having to create new ones. |
| final Map<String, QueuePartition> partitionMap = new HashMap<>(); |
| for (final QueuePartition partition : this.queuePartitions) { |
| final Optional<NodeIdentifier> nodeIdOption = partition.getNodeIdentifier(); |
| nodeIdOption.ifPresent(nodeIdentifier -> partitionMap.put(mapKeyTransform.apply(nodeIdentifier), partition)); |
| } |
| |
| // Re-define 'queuePartitions' array |
| final List<NodeIdentifier> sortedNodeIdentifiers = new ArrayList<>(updatedNodeIdentifiers); |
| sortedNodeIdentifiers.sort(Comparator.comparing(nodeId -> nodeId.getApiAddress() + ":" + nodeId.getApiPort())); |
| |
| QueuePartition[] updatedQueuePartitions; |
| if (sortedNodeIdentifiers.isEmpty()) { |
| updatedQueuePartitions = new QueuePartition[] { localPartition }; |
| } else { |
| updatedQueuePartitions = new QueuePartition[sortedNodeIdentifiers.size()]; |
| } |
| |
| // Populate the new QueuePartitions. |
| boolean localPartitionIncluded = false; |
| for (int i = 0; i < sortedNodeIdentifiers.size(); i++) { |
| final NodeIdentifier nodeId = sortedNodeIdentifiers.get(i); |
| final String nodeIdMapKey = mapKeyTransform.apply(nodeId); |
| |
| if (nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { |
| updatedQueuePartitions[i] = localPartition; |
| localPartitionIncluded = true; |
| |
| // If we have RemoteQueuePartition with this Node ID with data, that data must be migrated to the local partition. |
| // This can happen if we didn't previously know our Node UUID. |
| final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey); |
| if (existingPartition != null && existingPartition != localPartition) { |
| final FlowFileQueueContents partitionContents = existingPartition.packageForRebalance(localPartition.getSwapPartitionName()); |
| logger.debug("Transferred data from {} to {}", existingPartition, localPartition); |
| localPartition.inheritQueueContents(partitionContents); |
| } |
| |
| continue; |
| } |
| |
| final QueuePartition existingPartition = partitionMap.get(nodeIdMapKey); |
| updatedQueuePartitions[i] = existingPartition == null ? createRemotePartition(nodeId) : existingPartition; |
| } |
| |
| if (!localPartitionIncluded) { |
| final QueuePartition[] withLocal = new QueuePartition[updatedQueuePartitions.length + 1]; |
| System.arraycopy(updatedQueuePartitions, 0, withLocal, 0, updatedQueuePartitions.length); |
| withLocal[withLocal.length - 1] = localPartition; |
| updatedQueuePartitions = withLocal; |
| } |
| |
| // If the partition requires that all partitions be re-balanced when the number of partitions changes, then do so. |
| // Otherwise, just rebalance the data from any Partitions that were removed, if any. |
| if (partitioner.isRebalanceOnClusterResize()) { |
| for (final QueuePartition queuePartition : this.queuePartitions) { |
| logger.debug("Rebalancing {}", queuePartition); |
| rebalance(queuePartition); |
| } |
| } else { |
| // Not all partitions need to be rebalanced, so just ensure that we rebalance any FlowFiles that are destined |
| // for a node that is no longer in the cluster. |
| for (final NodeIdentifier removedNodeId : removedNodeIds) { |
| final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId); |
| |
| final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey); |
| if (removedPartition == null) { |
| continue; |
| } |
| |
| logger.debug("Rebalancing {}", removedPartition); |
| rebalance(removedPartition); |
| } |
| } |
| |
| // Unregister any client for which the node was removed from the cluster |
| for (final NodeIdentifier removedNodeId : removedNodeIds) { |
| final String removedNodeMapKey = mapKeyTransform.apply(removedNodeId); |
| |
| final QueuePartition removedPartition = partitionMap.get(removedNodeMapKey); |
| if (removedPartition instanceof RemoteQueuePartition) { |
| ((RemoteQueuePartition) removedPartition).onRemoved(); |
| } |
| } |
| |
| |
| this.nodeIdentifiers.clear(); |
| this.nodeIdentifiers.addAll(updatedNodeIdentifiers); |
| |
| this.queuePartitions = updatedQueuePartitions; |
| |
| logger.debug("{} Restarting the {} queue partitions now that node identifiers have been updated", this, queuePartitions.length); |
| if (!stopped) { |
| for (final QueuePartition queuePartition : updatedQueuePartitions) { |
| queuePartition.start(partitioner); |
| } |
| } |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| protected void rebalance(final QueuePartition partition) { |
| logger.debug("Rebalancing Partition {}", partition); |
| final FlowFileQueueContents contents = partition.packageForRebalance(rebalancingPartition.getSwapPartitionName()); |
| rebalancingPartition.rebalance(contents); |
| } |
| |
| @Override |
| public void put(final FlowFileRecord flowFile) { |
| putAndGetPartition(flowFile); |
| } |
| |
| |
| protected QueuePartition putAndGetPartition(final FlowFileRecord flowFile) { |
| final QueuePartition partition; |
| |
| partitionReadLock.lock(); |
| try { |
| adjustSize(1, flowFile.getSize()); |
| |
| partition = getPartition(flowFile); |
| partition.put(flowFile); |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| |
| eventListener.triggerDestinationEvent(); |
| return partition; |
| } |
| |
| public void receiveFromPeer(final Collection<FlowFileRecord> flowFiles) throws IllegalClusterStateException { |
| partitionReadLock.lock(); |
| try { |
| if (offloaded) { |
| throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is in the process of offloading"); |
| } |
| if (!clusterCoordinator.isConnected()) { |
| throw new IllegalClusterStateException("Node cannot accept data from load-balanced connection because it is not connected to cluster"); |
| } |
| |
| if (partitioner.isRebalanceOnClusterResize()) { |
| logger.debug("Received the following FlowFiles from Peer: {}. Will re-partition FlowFiles to ensure proper balancing across the cluster.", flowFiles); |
| putAll(flowFiles); |
| } else { |
| logger.debug("Received the following FlowFiles from Peer: {}. Will accept FlowFiles to the local partition", flowFiles); |
| |
| // As explained in the putAllAndGetPartitions() method, we must ensure that we call adjustSize() before we |
| // put the FlowFiles on the queue. Otherwise, we will encounter a race condition. Specifically, that race condition |
| // can play out like so: |
| // |
| // Thread 1: Call localPartition.putAll() when the queue is empty (has a queue size of 0) but has not yet adjusted the size. |
| // Thread 2: Call poll() to obtain the FlowFile just received. |
| // Thread 2: Transfer the FlowFile to some Relationship |
| // Thread 2: Commit the session, which will call acknowledge on this queue. |
| // Thread 2: The acknowledge() method attempts to decrement the size of the queue to -1. |
| // This causes an Exception to be thrown and the queue size to remain at 0. |
| // However, the FlowFile has already been successfully transferred to the next Queue. |
| // Thread 1: Call adjustSize() to increment the size of the queue to 1 FlowFile. |
| // |
| // In this scenario, we now have no FlowFiles in the queue. However, the queue size is set to 1. |
| // We can avoid this race condition by simply ensuring that we call adjustSize() before making the FlowFiles |
| // available on the queue. This way, we cannot possibly obtain the FlowFiles and process/acknowledge them before the queue |
| // size has been updated to account for them and therefore we will not attempt to assign a negative queue size. |
| adjustSize(flowFiles.size(), flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum()); |
| localPartition.putAll(flowFiles); |
| } |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void putAll(final Collection<FlowFileRecord> flowFiles) { |
| putAllAndGetPartitions(flowFiles); |
| } |
| |
| protected Map<QueuePartition, List<FlowFileRecord>> putAllAndGetPartitions(final Collection<FlowFileRecord> flowFiles) { |
| partitionReadLock.lock(); |
| try { |
| // NOTE WELL: It is imperative that we adjust the size of the queue here before distributing FlowFiles to partitions. |
| // If we do it the other way around, we could encounter a race condition where we distribute a FlowFile to the Local Partition, |
| // but have not yet adjusted the size. The processor consuming from this queue could then poll() the FlowFile, and acknowledge it. |
| // If that happens before we adjust the size, then we can end up with a negative Queue Size, which will throw an IllegalArgumentException, |
| // and we end up with the wrong Queue Size. |
| final long bytes = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum(); |
| adjustSize(flowFiles.size(), bytes); |
| |
| final Map<QueuePartition, List<FlowFileRecord>> partitionMap = distributeToPartitionsAndGet(flowFiles); |
| |
| return partitionMap; |
| } finally { |
| partitionReadLock.unlock(); |
| |
| eventListener.triggerDestinationEvent(); |
| } |
| } |
| |
| @Override |
| public void distributeToPartitions(final Collection<FlowFileRecord> flowFiles) { |
| distributeToPartitionsAndGet(flowFiles); |
| } |
| |
| public Map<QueuePartition, List<FlowFileRecord>> distributeToPartitionsAndGet(final Collection<FlowFileRecord> flowFiles) { |
| if (flowFiles == null || flowFiles.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| |
| final Map<QueuePartition, List<FlowFileRecord>> partitionMap; |
| |
| partitionReadLock.lock(); |
| try { |
| // Optimize for the most common case (no load balancing) so that we will just call getPartition() for the first FlowFile |
| // in the Collection and then put all FlowFiles into that QueuePartition. Is fairly expensive to call stream().collect(#groupingBy). |
| if (partitioner.isPartitionStatic()) { |
| final QueuePartition partition = getPartition(flowFiles.iterator().next()); |
| partition.putAll(flowFiles); |
| |
| final List<FlowFileRecord> flowFileList = (flowFiles instanceof List) ? (List<FlowFileRecord>) flowFiles : new ArrayList<>(flowFiles); |
| partitionMap = Collections.singletonMap(partition, flowFileList); |
| |
| logger.debug("Partitioner is static so Partitioned FlowFiles as: {}", partitionMap); |
| return partitionMap; |
| } |
| |
| partitionMap = flowFiles.stream().collect(Collectors.groupingBy(this::getPartition)); |
| logger.debug("Partitioned FlowFiles as: {}", partitionMap); |
| |
| for (final Map.Entry<QueuePartition, List<FlowFileRecord>> entry : partitionMap.entrySet()) { |
| final QueuePartition partition = entry.getKey(); |
| final List<FlowFileRecord> flowFilesForPartition = entry.getValue(); |
| |
| partition.putAll(flowFilesForPartition); |
| } |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| |
| return partitionMap; |
| } |
| |
| protected void setFlowFilePartitioner(final FlowFilePartitioner partitioner) { |
| partitionWriteLock.lock(); |
| try { |
| if (this.partitioner.equals(partitioner)) { |
| return; |
| } |
| |
| this.partitioner = partitioner; |
| |
| for (final QueuePartition partition : this.queuePartitions) { |
| rebalance(partition); |
| } |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| @Override |
| public FlowFileRecord poll(final Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) { |
| final FlowFileRecord flowFile = localPartition.poll(expiredRecords, pollStrategy); |
| onAbort(expiredRecords); |
| return flowFile; |
| } |
| |
| @Override |
| public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) { |
| final List<FlowFileRecord> flowFiles = localPartition.poll(maxResults, expiredRecords, pollStrategy); |
| onAbort(expiredRecords); |
| return flowFiles; |
| } |
| |
| @Override |
| public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords, final PollStrategy pollStrategy) { |
| final List<FlowFileRecord> flowFiles = localPartition.poll(filter, expiredRecords, pollStrategy); |
| onAbort(expiredRecords); |
| return flowFiles; |
| } |
| |
| @Override |
| public void acknowledge(final FlowFileRecord flowFile) { |
| localPartition.acknowledge(flowFile); |
| |
| adjustSize(-1, -flowFile.getSize()); |
| eventListener.triggerSourceEvent(); |
| } |
| |
| @Override |
| public void acknowledge(final Collection<FlowFileRecord> flowFiles) { |
| localPartition.acknowledge(flowFiles); |
| |
| if (!flowFiles.isEmpty()) { |
| final long bytes = flowFiles.stream().mapToLong(FlowFileRecord::getSize).sum(); |
| adjustSize(-flowFiles.size(), -bytes); |
| } |
| |
| eventListener.triggerSourceEvent(); |
| } |
| |
| @Override |
| public boolean isUnacknowledgedFlowFile() { |
| return localPartition.isUnacknowledgedFlowFile(); |
| } |
| |
| @Override |
| public FlowFileRecord getFlowFile(final String flowFileUuid) throws IOException { |
| return localPartition.getFlowFile(flowFileUuid); |
| } |
| |
| @Override |
| public boolean isPropagateBackpressureAcrossNodes() { |
| // If offloaded = false, the queue is not offloading; return true to honor backpressure |
| // If offloaded = true, the queue is offloading or has finished offloading; return false to ignore backpressure |
| return !offloaded; |
| } |
| |
| @Override |
| public void handleExpiredRecords(final Collection<FlowFileRecord> expired) { |
| if (expired == null || expired.isEmpty()) { |
| return; |
| } |
| |
| logger.info("{} {} FlowFiles have expired and will be removed", new Object[] {this, expired.size()}); |
| final List<RepositoryRecord> expiredRecords = new ArrayList<>(expired.size()); |
| final List<ProvenanceEventRecord> provenanceEvents = new ArrayList<>(expired.size()); |
| |
| for (final FlowFileRecord flowFile : expired) { |
| final StandardRepositoryRecord record = new StandardRepositoryRecord(this, flowFile); |
| record.markForDelete(); |
| expiredRecords.add(record); |
| |
| final ProvenanceEventBuilder builder = new StandardProvenanceEventRecord.Builder() |
| .fromFlowFile(flowFile) |
| .setEventType(ProvenanceEventType.EXPIRE) |
| .setDetails("Expiration Threshold = " + getFlowFileExpiration()) |
| .setComponentType("Load-Balanced Connection") |
| .setComponentId(getIdentifier()) |
| .setEventTime(System.currentTimeMillis()); |
| |
| final ContentClaim contentClaim = flowFile.getContentClaim(); |
| if (contentClaim != null) { |
| final ResourceClaim resourceClaim = contentClaim.getResourceClaim(); |
| builder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), |
| contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); |
| |
| builder.setPreviousContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), |
| contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize()); |
| } |
| |
| final ProvenanceEventRecord provenanceEvent = builder.build(); |
| provenanceEvents.add(provenanceEvent); |
| |
| final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); |
| logger.debug("{} terminated due to FlowFile expiration; life of FlowFile = {} ms", new Object[] {flowFile, flowFileLife}); |
| } |
| |
| try { |
| flowFileRepo.updateRepository(expiredRecords); |
| provRepo.registerEvents(provenanceEvents); |
| |
| adjustSize(-expired.size(), -expired.stream().mapToLong(FlowFileRecord::getSize).sum()); |
| } catch (IOException e) { |
| logger.warn("Encountered {} expired FlowFiles but failed to update FlowFile Repository. This FlowFiles may re-appear in the queue after NiFi is restarted and will be expired again at " + |
| "that point.", expiredRecords.size(), e); |
| } |
| } |
| |
| |
| @Override |
| protected List<FlowFileRecord> getListableFlowFiles() { |
| return localPartition.getListableFlowFiles(); |
| } |
| |
| @Override |
| protected void dropFlowFiles(final DropFlowFileRequest dropRequest, final String requestor) { |
| partitionReadLock.lock(); |
| try { |
| dropRequest.setOriginalSize(size()); |
| dropRequest.setState(DropFlowFileState.DROPPING_FLOWFILES); |
| |
| int droppedCount = 0; |
| long droppedBytes = 0; |
| |
| try { |
| for (final QueuePartition partition : queuePartitions) { |
| final DropFlowFileRequest partitionRequest = new DropFlowFileRequest(dropRequest.getRequestIdentifier() + "-" + localPartition.getNodeIdentifier()); |
| |
| partition.dropFlowFiles(partitionRequest, requestor); |
| |
| adjustSize(-partitionRequest.getDroppedSize().getObjectCount(), -partitionRequest.getDroppedSize().getByteCount()); |
| dropRequest.setDroppedSize(new QueueSize(dropRequest.getDroppedSize().getObjectCount() + partitionRequest.getDroppedSize().getObjectCount(), |
| dropRequest.getDroppedSize().getByteCount() + partitionRequest.getDroppedSize().getByteCount())); |
| |
| droppedCount += partitionRequest.getDroppedSize().getObjectCount(); |
| droppedBytes += partitionRequest.getDroppedSize().getByteCount(); |
| |
| dropRequest.setDroppedSize(new QueueSize(droppedCount, droppedBytes)); |
| dropRequest.setCurrentSize(size()); |
| |
| if (partitionRequest.getState() == DropFlowFileState.CANCELED) { |
| dropRequest.cancel(); |
| break; |
| } else if (partitionRequest.getState() == DropFlowFileState.FAILURE) { |
| dropRequest.setState(DropFlowFileState.FAILURE, partitionRequest.getFailureReason()); |
| break; |
| } |
| } |
| |
| if (dropRequest.getState() == DropFlowFileState.DROPPING_FLOWFILES) { |
| dropRequest.setState(DropFlowFileState.COMPLETE); |
| } |
| } catch (final Exception e) { |
| logger.error("Failed to drop FlowFiles for {}", this, e); |
| dropRequest.setState(DropFlowFileState.FAILURE, "Failed to drop FlowFiles due to " + e.getMessage() + ". See log for more details."); |
| } |
| } finally { |
| partitionReadLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void lock() { |
| partitionReadLock.lock(); |
| } |
| |
| @Override |
| public void unlock() { |
| partitionReadLock.unlock(); |
| } |
| |
| private class ClusterEventListener implements ClusterTopologyEventListener { |
| @Override |
| public void onNodeAdded(final NodeIdentifier nodeId) { |
| partitionWriteLock.lock(); |
| try { |
| if (nodeIdentifiers.contains(nodeId)) { |
| logger.debug("Node Identifier {} added to cluster but already known in set: {}", nodeId, nodeIdentifiers); |
| return; |
| } |
| |
| final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); |
| |
| // If there is any Node Identifier already that has the same identifier as the new one, remove it. This allows us to ensure that we |
| // have the correct Node Identifier in terms of Load Balancing host/port, even if the newly connected node changed its load balancing host/port |
| updatedNodeIds.removeIf(id -> id.getId().equals(nodeId.getId())); |
| updatedNodeIds.add(nodeId); |
| |
| logger.debug("Node Identifier {} added to cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); |
| setNodeIdentifiers(updatedNodeIds, false); |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void onNodeRemoved(final NodeIdentifier nodeId) { |
| partitionWriteLock.lock(); |
| try { |
| final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); |
| final boolean removed = updatedNodeIds.remove(nodeId); |
| if (!removed) { |
| return; |
| } |
| |
| logger.debug("Node Identifier {} removed from cluster. Node ID's changing from {} to {}", nodeId, nodeIdentifiers, updatedNodeIds); |
| setNodeIdentifiers(updatedNodeIds, false); |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void onLocalNodeIdentifierSet(final NodeIdentifier localNodeId) { |
| partitionWriteLock.lock(); |
| try { |
| if (localNodeId == null) { |
| return; |
| } |
| |
| if (!nodeIdentifiers.contains(localNodeId)) { |
| final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); |
| updatedNodeIds.add(localNodeId); |
| |
| logger.debug("Local Node Identifier has now been determined to be {}. Adding to set of Node Identifiers for {}", localNodeId, SocketLoadBalancedFlowFileQueue.this); |
| setNodeIdentifiers(updatedNodeIds, false); |
| } |
| |
| logger.debug("Local Node Identifier set to {}; current partitions = {}", localNodeId, queuePartitions); |
| |
| for (final QueuePartition partition : queuePartitions) { |
| final Optional<NodeIdentifier> nodeIdentifierOption = partition.getNodeIdentifier(); |
| if (!nodeIdentifierOption.isPresent()) { |
| continue; |
| } |
| |
| final NodeIdentifier nodeIdentifier = nodeIdentifierOption.get(); |
| if (nodeIdentifier.equals(localNodeId)) { |
| if (partition instanceof LocalQueuePartition) { |
| logger.debug("{} Local Node Identifier set to {} and QueuePartition with this identifier is already a Local Queue Partition", SocketLoadBalancedFlowFileQueue.this, |
| localNodeId); |
| break; |
| } |
| |
| logger.debug("{} Local Node Identifier set to {} and found Queue Partition {} with that Node Identifier. Will force update of partitions", |
| SocketLoadBalancedFlowFileQueue.this, localNodeId, partition); |
| |
| final Set<NodeIdentifier> updatedNodeIds = new HashSet<>(nodeIdentifiers); |
| updatedNodeIds.add(localNodeId); |
| setNodeIdentifiers(updatedNodeIds, true); |
| return; |
| } |
| } |
| |
| logger.debug("{} Local Node Identifier set to {} but found no Queue Partition with that Node Identifier.", SocketLoadBalancedFlowFileQueue.this, localNodeId); |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void onNodeStateChange(final NodeIdentifier nodeId, final NodeConnectionState newState) { |
| partitionWriteLock.lock(); |
| try { |
| if (!offloaded) { |
| switch (newState) { |
| case OFFLOADING: |
| onNodeRemoved(nodeId); |
| break; |
| case CONNECTED: |
| onNodeAdded(nodeId); |
| break; |
| } |
| } else { |
| switch (newState) { |
| case CONNECTED: |
| if (nodeId != null && nodeId.equals(clusterCoordinator.getLocalNodeIdentifier())) { |
| // the node with this queue was connected to the cluster, make sure the queue is not offloaded |
| resetOffloadedQueue(); |
| } |
| break; |
| case OFFLOADED: |
| case OFFLOADING: |
| case DISCONNECTED: |
| case DISCONNECTING: |
| onNodeRemoved(nodeId); |
| break; |
| } |
| } |
| } finally { |
| partitionWriteLock.unlock(); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "FlowFileQueue[id=" + getIdentifier() + ", Load Balance Strategy=" + getLoadBalanceStrategy() + ", size=" + size() + "]"; |
| } |
| } |
| |