blob: 9b8f0a7f7c0bb142a885fc2d75fc27af4ba7b727 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless.engine;
import org.apache.nifi.components.state.StatelessStateManagerProvider;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.stateless.flow.FailurePortEncounteredException;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.stateless.queue.DrainableFlowFileQueue;
import org.apache.nifi.stateless.session.AsynchronousCommitTracker;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class StandardExecutionProgress implements ExecutionProgress {
private final ProcessGroup rootGroup;
private final List<FlowFileQueue> internalFlowFileQueues;
private final ContentRepository contentRepository;
private final BlockingQueue<TriggerResult> resultQueue;
private final Set<String> failurePortNames;
private final AsynchronousCommitTracker commitTracker;
private final StatelessStateManagerProvider stateManagerProvider;
private final BlockingQueue<CompletionAction> completionActionQueue;
private volatile boolean canceled = false;
private volatile CompletionAction completionAction = null;
public StandardExecutionProgress(final ProcessGroup rootGroup, final List<FlowFileQueue> internalFlowFileQueues, final BlockingQueue<TriggerResult> resultQueue,
final ContentRepository contentRepository, final Set<String> failurePortNames, final AsynchronousCommitTracker commitTracker,
final StatelessStateManagerProvider stateManagerProvider) {
this.rootGroup = rootGroup;
this.internalFlowFileQueues = internalFlowFileQueues;
this.resultQueue = resultQueue;
this.contentRepository = contentRepository;
this.failurePortNames = failurePortNames;
this.commitTracker = commitTracker;
this.stateManagerProvider = stateManagerProvider;
completionActionQueue = new LinkedBlockingQueue<>();
}
@Override
public boolean isCanceled() {
return canceled;
}
@Override
public boolean isDataQueued() {
for (final FlowFileQueue queue : internalFlowFileQueues) {
if (!queue.isActiveQueueEmpty()) {
return true;
}
}
return false;
}
@Override
public CompletionAction awaitCompletionAction() throws InterruptedException {
if (canceled) {
return CompletionAction.CANCEL;
}
final CompletionAction existingAction = this.completionAction;
if (existingAction != null) {
return existingAction;
}
final TriggerResult triggerResult = createResult();
resultQueue.offer(triggerResult);
final CompletionAction completionAction = completionActionQueue.take();
// Hold onto the result so that the other Process Sessions that call this method can retrieve the result.
this.completionAction = completionAction;
return completionAction;
}
private TriggerResult createResult() {
final Map<String, List<FlowFile>> outputFlowFiles = drainOutputQueues();
for (final String failurePortName : failurePortNames) {
final List<FlowFile> flowFilesForPort = outputFlowFiles.get(failurePortName);
if (flowFilesForPort != null && !flowFilesForPort.isEmpty()) {
throw new FailurePortEncounteredException("FlowFile was transferred to Port " + failurePortName + ", which is marked as a Failure Port");
}
}
final boolean canceled = isCanceled();
return new TriggerResult() {
private volatile Throwable abortCause = null;
@Override
public boolean isSuccessful() {
return abortCause == null;
}
@Override
public boolean isCanceled() {
return canceled;
}
@Override
public Optional<Throwable> getFailureCause() {
return Optional.ofNullable(abortCause);
}
@Override
public Map<String, List<FlowFile>> getOutputFlowFiles() {
return outputFlowFiles;
}
@Override
public List<FlowFile> getOutputFlowFiles(final String portName) {
return outputFlowFiles.computeIfAbsent(portName, name -> Collections.emptyList());
}
@Override
public InputStream readContent(final FlowFile flowFile) throws IOException {
if (!(flowFile instanceof FlowFileRecord)) {
throw new IllegalArgumentException("FlowFile was not created by this flow");
}
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final ContentClaim contentClaim = flowFileRecord.getContentClaim();
final InputStream in = contentRepository.read(contentClaim);
final long offset = flowFileRecord.getContentClaimOffset();
if (offset > 0) {
StreamUtils.skip(in, offset);
}
return new LimitedInputStream(in, flowFile.getSize());
}
@Override
public byte[] readContentAsByteArray(final FlowFile flowFile) throws IOException {
if (!(flowFile instanceof FlowFileRecord)) {
throw new IllegalArgumentException("FlowFile was not created by this flow");
}
if (flowFile.getSize() > Integer.MAX_VALUE) {
throw new IOException("Cannot return contents of " + flowFile + " as a byte array because the contents exceed the maximum length supported for byte arrays ("
+ Integer.MAX_VALUE + " bytes)");
}
final FlowFileRecord flowFileRecord = (FlowFileRecord) flowFile;
final long size = flowFileRecord.getSize();
final byte[] flowFileContents = new byte[(int) size];
try (final InputStream in = readContent(flowFile)) {
StreamUtils.fillBuffer(in, flowFileContents);
}
return flowFileContents;
}
@Override
public void acknowledge() {
commitTracker.triggerCallbacks();
stateManagerProvider.commitUpdates();
completionActionQueue.offer(CompletionAction.COMPLETE);
contentRepository.purge();
}
@Override
public void abort(final Throwable cause) {
abortCause = new DataflowAbortedException("Dataflow was aborted", cause);
notifyExecutionFailed(abortCause);
}
};
}
@Override
public void notifyExecutionCanceled() {
canceled = true;
commitTracker.triggerFailureCallbacks(new RuntimeException("Dataflow Canceled"));
stateManagerProvider.rollbackUpdates();
completionActionQueue.offer(CompletionAction.CANCEL);
contentRepository.purge();
}
@Override
public void notifyExecutionFailed(final Throwable cause) {
commitTracker.triggerFailureCallbacks(cause);
stateManagerProvider.rollbackUpdates();
completionActionQueue.offer(CompletionAction.CANCEL);
contentRepository.purge();
}
public Map<String, List<FlowFile>> drainOutputQueues() {
final Map<String, List<FlowFile>> flowFileMap = new HashMap<>();
for (final Port port : rootGroup.getOutputPorts()) {
final List<FlowFile> flowFiles = drainOutputQueues(port);
flowFileMap.put(port.getName(), flowFiles);
}
return flowFileMap;
}
private List<FlowFile> drainOutputQueues(final Port port) {
final List<Connection> incomingConnections = port.getIncomingConnections();
if (incomingConnections.isEmpty()) {
return Collections.emptyList();
}
final List<FlowFile> portFlowFiles = new ArrayList<>();
for (final Connection connection : incomingConnections) {
final DrainableFlowFileQueue flowFileQueue = (DrainableFlowFileQueue) connection.getFlowFileQueue();
final List<FlowFileRecord> flowFileRecords = new ArrayList<>(flowFileQueue.size().getObjectCount());
flowFileQueue.drainTo(flowFileRecords);
portFlowFiles.addAll(flowFileRecords);
for (final FlowFileRecord flowFileRecord : flowFileRecords) {
contentRepository.decrementClaimantCount(flowFileRecord.getContentClaim());
}
}
return portFlowFiles;
}
}