blob: 11496c7056e81319d6708f376fe15c12dca1d3bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.flow;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.controller.repository.metrics.StandardFlowFileEvent;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.stateless.engine.ExecutionProgress;
import org.apache.nifi.stateless.engine.ProcessContextFactory;
import org.apache.nifi.stateless.repository.RepositoryContextFactory;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stateless.session.StatelessProcessSessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.Set;
public class StandardStatelessFlowCurrent implements StatelessFlowCurrent {
private static final Logger logger = LoggerFactory.getLogger(StandardStatelessFlowCurrent.class);
private final TransactionThresholdMeter transactionThresholdMeter;
private final AsynchronousCommitTracker tracker;
private final ExecutionProgress executionProgress;
private final Set<Connectable> rootConnectables;
private final RepositoryContextFactory repositoryContextFactory;
private final ProcessContextFactory processContextFactory;
private Connectable currentComponent = null;
private StandardStatelessFlowCurrent(final Builder builder) {
this.transactionThresholdMeter = builder.transactionThresholdMeter;
this.tracker = builder.tracker;
this.executionProgress = builder.executionProgress;
this.rootConnectables = builder.rootConnectables;
this.repositoryContextFactory = builder.repositoryContextFactory;
this.processContextFactory = builder.processContextFactory;
}
public Connectable getCurrentComponent() {
return currentComponent;
}
@Override
public void triggerFlow() {
try {
boolean completionReached = false;
while (!completionReached) {
triggerRootConnectables();
NextConnectable nextConnectable = NextConnectable.NEXT_READY;
while (tracker.isAnyReady() && nextConnectable == NextConnectable.NEXT_READY) {
final List<Connectable> next = tracker.getReady();
logger.debug("The following {} components are ready to be triggered: {}", next.size(), next);
for (final Connectable connectable : next) {
nextConnectable = triggerWhileReady(connectable);
// If there's nothing left to do, return
if (nextConnectable == NextConnectable.NONE) {
return;
}
// If next connectable is whatever is ready, just continue loop
if (nextConnectable == NextConnectable.NEXT_READY) {
continue;
}
// Otherwise, we need to break out of this loop so that we can trigger root connectables or complete dataflow
break;
}
}
completionReached = !tracker.isAnyReady();
}
} catch (final Throwable t) {
logger.error("Failed to trigger {}", currentComponent, t);
executionProgress.notifyExecutionFailed(t);
tracker.triggerFailureCallbacks(t);
throw t;
}
}
private void triggerRootConnectables() {
for (final Connectable connectable : rootConnectables) {
currentComponent = connectable;
// Reset progress and trigger the component. This allows us to track whether or not any progress was made by the given connectable
// during this invocation of its onTrigger method.
tracker.resetProgress();
trigger(connectable, executionProgress, tracker);
// Keep track of the output of the source component so that we can determine whether or not we've reached our transaction threshold.
transactionThresholdMeter.incrementFlowFiles(tracker.getFlowFilesProduced());
transactionThresholdMeter.incrementBytes(tracker.getBytesProduced());
}
}
private NextConnectable triggerWhileReady(final Connectable connectable) {
while (tracker.isReady(connectable)) {
if (executionProgress.isCanceled()) {
logger.info("Dataflow was canceled so will not trigger any more components");
return NextConnectable.NONE;
}
currentComponent = connectable;
// Reset progress and trigger the component. This allows us to track whether or not any progress was made by the given connectable
// during this invocation of its onTrigger method.
tracker.resetProgress();
trigger(connectable, executionProgress, tracker);
// Check if the component made any progress or not. If so, continue on. If not, we need to check if providing the component with
// additional input would help the component to progress or not.
final boolean progressed = tracker.isProgress();
if (progressed) {
logger.debug("{} was triggered and made progress", connectable);
continue;
}
final boolean thresholdMet = transactionThresholdMeter.isThresholdMet();
if (thresholdMet) {
logger.debug("{} was triggered but unable to make progress. The transaction thresholds {} have been met (currently at {}). Will not " +
"trigger source components to run.", connectable, transactionThresholdMeter.getThresholds(), transactionThresholdMeter);
continue;
}
logger.debug("{} was triggered but unable to make progress. Maximum transaction thresholds {} have not been reached (currently at {}) " +
"so will trigger source components to run.", connectable, transactionThresholdMeter.getThresholds(), transactionThresholdMeter);
return NextConnectable.SOURCE_CONNECTABLE;
}
return NextConnectable.NEXT_READY;
}
private void trigger(final Connectable connectable, final ExecutionProgress executionProgress, final AsynchronousCommitTracker tracker) {
final ProcessContext processContext = processContextFactory.createProcessContext(connectable);
final StatelessProcessSessionFactory sessionFactory = new StatelessProcessSessionFactory(connectable, repositoryContextFactory, processContextFactory,
executionProgress, false, tracker);
final long start = System.nanoTime();
// Trigger component
logger.debug("Triggering {}", connectable);
connectable.onTrigger(processContext, sessionFactory);
final long processingNanos = System.nanoTime() - start;
registerProcessEvent(connectable, 1, processingNanos);
}
private void registerProcessEvent(final Connectable connectable, final int invocations, final long processingNanos) {
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent();
procEvent.setProcessingNanos(processingNanos);
procEvent.setInvocations(invocations);
repositoryContextFactory.getFlowFileEventRepository().updateRepository(procEvent, connectable.getIdentifier());
} catch (final IOException e) {
logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", connectable.getRunnableComponent(), e.toString(), e);
}
}
private enum NextConnectable {
NEXT_READY,
SOURCE_CONNECTABLE,
NONE;
}
public static class Builder {
private TransactionThresholdMeter transactionThresholdMeter;
private AsynchronousCommitTracker tracker;
private ExecutionProgress executionProgress;
private Set<Connectable> rootConnectables;
private RepositoryContextFactory repositoryContextFactory;
private ProcessContextFactory processContextFactory;
public StandardStatelessFlowCurrent build() {
Objects.requireNonNull(transactionThresholdMeter, "Transaction Threshold Meter must be set");
Objects.requireNonNull(tracker, "Commit Tracker must be set");
Objects.requireNonNull(executionProgress, "Execution Progress must be set");
Objects.requireNonNull(rootConnectables, "Root Conectables must be set");
Objects.requireNonNull(repositoryContextFactory, "Repository Context Factory must be set");
Objects.requireNonNull(processContextFactory, "Process Context Factory must be set");
return new StandardStatelessFlowCurrent(this);
}
public Builder transactionThresholdMeter(final TransactionThresholdMeter transactionThresholdMeter) {
this.transactionThresholdMeter = transactionThresholdMeter;
return this;
}
public Builder commitTracker(final AsynchronousCommitTracker commitTracker) {
this.tracker = commitTracker;
return this;
}
public Builder executionProgress(final ExecutionProgress executionProgress) {
this.executionProgress = executionProgress;
return this;
}
public Builder rootConnectables(final Set<Connectable> rootConnectables) {
this.rootConnectables = rootConnectables;
return this;
}
public Builder repositoryContextFactory(final RepositoryContextFactory repositoryContextFactory) {
this.repositoryContextFactory = repositoryContextFactory;
return this;
}
public Builder processContextFactory(final ProcessContextFactory processContextFactory) {
this.processContextFactory = processContextFactory;
return this;
}
}
}