blob: 25e8a86f111439d6f40b6acf78a59bde1dbefd48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.util.Connectables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class EventDrivenWorkerQueue implements WorkerQueue {
private final Object workMonitor = new Object();
private final Map<Connectable, Worker> workerMap = new HashMap<>(); // protected by synchronizing on workMonitor
private final WorkerReadyQueue workerQueue;
public EventDrivenWorkerQueue(final boolean clustered, final boolean primary, final ProcessScheduler scheduler) {
workerQueue = new WorkerReadyQueue(scheduler);
workerQueue.setClustered(clustered);
workerQueue.setPrimary(primary);
}
@Override
public void setClustered(final boolean clustered) {
workerQueue.setClustered(clustered);
}
@Override
public void setPrimary(final boolean primary) {
workerQueue.setPrimary(primary);
}
@Override
public Worker poll(final long timeout, final TimeUnit timeUnit) {
final long maxTime = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(timeout, timeUnit);
while (System.currentTimeMillis() < maxTime) {
synchronized (workMonitor) {
final Worker worker = workerQueue.poll();
if (worker == null) {
// nothing to do. wait until we have something to do.
final long timeLeft = maxTime - System.currentTimeMillis();
if (timeLeft <= 0) {
return null;
}
try {
workMonitor.wait(timeLeft);
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
return null;
}
} else {
// Decrement the amount of work there is to do for this worker.
final int workLeft = worker.decrementEventCount();
if (workLeft > 0) {
workerQueue.offer(worker);
}
return worker;
}
}
}
return null;
}
@Override
public void offer(final Connectable connectable) {
synchronized (workMonitor) {
Worker worker = workerMap.get(connectable);
if (worker == null) {
// if worker is null, then it has not been scheduled to run; ignore the event.
return;
}
final int countBefore = worker.incrementEventCount();
if (countBefore < 0) {
worker.setWorkCount(1);
}
if (countBefore <= 0) {
// If countBefore > 0 then it's already on the queue, so just incrementing its counter is sufficient.
workerQueue.offer(worker);
}
workMonitor.notify();
}
}
private int getWorkCount(final Connectable connectable) {
int sum = 0;
for (final Connection connection : connectable.getIncomingConnections()) {
sum += connection.getFlowFileQueue().size().getObjectCount();
}
return sum;
}
@Override
public void resumeWork(final Connectable connectable) {
synchronized (workMonitor) {
final int workCount = getWorkCount(connectable);
final Worker worker = new Worker(connectable);
workerMap.put(connectable, worker);
if (workCount > 0) {
worker.setWorkCount(workCount);
workerQueue.offer(worker);
workMonitor.notify();
}
}
}
@Override
public void suspendWork(final Connectable connectable) {
synchronized (workMonitor) {
final Worker worker = this.workerMap.remove(connectable);
if (worker == null) {
return;
}
worker.resetWorkCount();
workerQueue.remove(worker);
}
}
public static class Worker implements EventBasedWorker {
private final Connectable connectable;
private final AtomicInteger workCount = new AtomicInteger(0);
public Worker(final Connectable connectable) {
this.connectable = connectable;
}
@Override
public Connectable getConnectable() {
return connectable;
}
@Override
public int decrementEventCount() {
return workCount.decrementAndGet();
}
@Override
public int incrementEventCount() {
return workCount.getAndIncrement();
}
void resetWorkCount() {
workCount.set(0);
}
void setWorkCount(final int workCount) {
this.workCount.set(workCount);
}
}
@SuppressWarnings("serial")
private static class WorkerReadyQueue extends LinkedList<Worker> {
private final ProcessScheduler scheduler;
private volatile boolean clustered = false;
private volatile boolean primary = false;
public WorkerReadyQueue(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
}
public void setClustered(final boolean clustered) {
this.clustered = clustered;
}
public void setPrimary(final boolean primary) {
this.primary = primary;
}
@Override
public Worker poll() {
final List<Worker> putBack = new ArrayList<>();
Worker worker;
try {
while ((worker = super.poll()) != null) {
final DelayProcessingReason reason = getDelayReason(worker);
if (reason == null) {
return worker;
} else {
// Worker is not ready. We may want to add him back to the queue, depending on the reason that he is unready.
switch (reason) {
case YIELDED:
case ISOLATED:
case DESTINATION_FULL:
case ALL_WORK_PENALIZED:
case NO_WORK:
case TOO_MANY_THREADS:
// there will not be an event that triggers this to happen, so we add this worker back to the queue.
putBack.add(worker);
break;
default:
case NOT_RUNNING:
// There's no need to check if this worker is available again until a another event
// occurs. Therefore, we keep him off of the queue and reset his work count
worker.resetWorkCount();
break;
}
}
}
} finally {
if (!putBack.isEmpty()) {
super.addAll(putBack);
}
}
return null;
}
private DelayProcessingReason getDelayReason(final Worker worker) {
final Connectable connectable = worker.getConnectable();
if (ScheduledState.RUNNING != connectable.getScheduledState()) {
return DelayProcessingReason.NOT_RUNNING;
}
if (connectable.getYieldExpiration() > System.currentTimeMillis()) {
return DelayProcessingReason.YIELDED;
}
// For Remote Output Ports,
int availableRelationshipCount = 0;
if (!connectable.getRelationships().isEmpty()) {
availableRelationshipCount = getAvailableRelationshipCount(connectable);
if (availableRelationshipCount == 0) {
return DelayProcessingReason.DESTINATION_FULL;
}
}
if (connectable.hasIncomingConnection() && !Connectables.flowFilesQueued(connectable)) {
return DelayProcessingReason.NO_WORK;
}
final int activeThreadCount = scheduler.getActiveThreadCount(worker.getConnectable());
final int maxThreadCount = worker.getConnectable().getMaxConcurrentTasks();
if (maxThreadCount > 0 && activeThreadCount >= maxThreadCount) {
return DelayProcessingReason.TOO_MANY_THREADS;
}
if (connectable instanceof ProcessorNode) {
final ProcessorNode procNode = (ProcessorNode) connectable;
if (procNode.isIsolated() && clustered && !primary) {
return DelayProcessingReason.ISOLATED;
}
final boolean triggerWhenAnyAvailable = procNode.isTriggerWhenAnyDestinationAvailable();
final boolean allDestinationsAvailable = availableRelationshipCount == procNode.getRelationships().size();
if (!triggerWhenAnyAvailable && !allDestinationsAvailable) {
return DelayProcessingReason.DESTINATION_FULL;
}
}
return null;
}
private int getAvailableRelationshipCount(final Connectable connectable) {
int count = 0;
for (final Relationship relationship : connectable.getRelationships()) {
final Collection<Connection> connections = connectable.getConnections(relationship);
if (connections == null || connections.isEmpty()) {
if (connectable.isAutoTerminated(relationship)) {
// If the relationship is auto-terminated, consider it available.
count++;
}
} else {
boolean available = true;
for (final Connection connection : connections) {
if (connection.getSource() == connection.getDestination()) {
// don't count self-loops
continue;
}
if (connection.getFlowFileQueue().isFull()) {
available = false;
}
}
if (available) {
count++;
}
}
}
return count;
}
}
private static enum DelayProcessingReason {
YIELDED,
DESTINATION_FULL,
NO_WORK,
ALL_WORK_PENALIZED,
ISOLATED,
NOT_RUNNING,
TOO_MANY_THREADS;
}
}