blob: 34fa87a6f32ecb117e0dcfd3aa3af7832b7e8746 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.inheritance;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.serialization.FlowEncodingVersion;
import org.apache.nifi.controller.serialization.FlowFromDOMFactory;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.web.api.dto.ConnectionDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
/**
* Determines whether or not the proposed flow can be inherited based on whether or not it has all of the Connections that locally have data queued.
* If the local flow has any connection in which data is queued, that connection must exist in the proposed flow, or else the flow will be considered uninheritable.
*/
public class ConnectionMissingCheck implements FlowInheritabilityCheck {
private static final Logger logger = LoggerFactory.getLogger(ConnectionMissingCheck.class);
@Override
public FlowInheritability checkInheritability(final DataFlow existingFlow, final DataFlow proposedFlow, final FlowController flowController) {
final Document flowDocument = proposedFlow.getFlowDocument();
final Element rootGroupElement = (Element) flowDocument.getDocumentElement().getElementsByTagName("rootGroup").item(0);
final FlowEncodingVersion encodingVersion = FlowEncodingVersion.parse(flowDocument.getDocumentElement());
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, null, encodingVersion);
final Set<String> connectionIds = findAllConnectionIds(rootGroupDto);
final FlowFileRepository flowFileRepository = flowController.getRepositoryContextFactory().getFlowFileRepository();
final Set<String> queuesWithFlowFiles;
try {
queuesWithFlowFiles = flowFileRepository.findQueuesWithFlowFiles(flowController.createSwapManager());
} catch (final IOException ioe) {
throw new FlowSynchronizationException("Failed to determine which connections have FlowFiles queued", ioe);
}
logger.debug("The following {} Connections/Queues have data queued up currently: {}", queuesWithFlowFiles.size(), queuesWithFlowFiles);
for (final String queueId : queuesWithFlowFiles) {
if (!connectionIds.contains(queueId)) {
return FlowInheritability.notInheritable("Proposed Flow does not contain a Connection with ID " + queueId + " but this instance has data queued in that connection");
}
}
return FlowInheritability.inheritable();
}
private Set<String> findAllConnectionIds(final ProcessGroupDTO group) {
final Set<String> connectionIds = new HashSet<>();
findAllConnectionIds(group, connectionIds);
return connectionIds;
}
private void findAllConnectionIds(final ProcessGroupDTO group, final Set<String> ids) {
for (final ConnectionDTO connectionDTO : group.getContents().getConnections()) {
ids.add(connectionDTO.getId());
}
for (final ProcessGroupDTO childGroup : group.getContents().getProcessGroups()) {
findAllConnectionIds(childGroup, ids);
}
}
}