blob: 49ae6d3d248353d3cd4ef5873608c78ca04c7233 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.AuthorizerCapabilityDetection;
import org.apache.nifi.authorization.ManagedAuthorizer;
import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.cluster.ConnectionException;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.exception.NoClusterCoordinatorException;
import org.apache.nifi.cluster.protocol.ComponentRevisionSnapshot;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.NodeProtocolSenderListener;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.FlowRequestMessage;
import org.apache.nifi.cluster.protocol.message.FlowResponseMessage;
import org.apache.nifi.cluster.protocol.message.OffloadMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionResponseMessage;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.lifecycle.LifeCycleStartException;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.nar.NarClassLoadersHolder;
import org.apache.nifi.persistence.FlowConfigurationDAO;
import org.apache.nifi.persistence.StandardXMLFlowConfigurationDAO;
import org.apache.nifi.persistence.TemplateDeserializer;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.stream.io.GZIPOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.TemplateDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.nifi.web.revision.RevisionSnapshot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
public class StandardFlowService implements FlowService, ProtocolHandler {
private static final String EVENT_CATEGORY = "Controller";
private static final String CLUSTER_NODE_CONFIG = "Cluster Node Configuration";
// state keys
private static final String NODE_UUID = "Node UUID";
private final FlowController controller;
private final Path flowXml;
private final FlowConfigurationDAO dao;
private final int gracefulShutdownSeconds;
private final boolean autoResumeState;
private final Authorizer authorizer;
// Lock is used to protect the flow.xml file.
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<>(null);
private final AtomicReference<SaveHolder> saveHolder = new AtomicReference<>(null);
private final ClusterCoordinator clusterCoordinator;
private final RevisionManager revisionManager;
/**
* listener/sender for internal cluster communication
*/
private final NodeProtocolSenderListener senderListener;
/**
* flag indicating whether we are operating in a clustered environment
*/
private final boolean configuredForClustering;
/**
* the node identifier
*/
private NodeIdentifier nodeId;
// guardedBy rwLock
private boolean firstControllerInitialization = true;
private final NiFiProperties nifiProperties;
private static final String CONNECTION_EXCEPTION_MSG_PREFIX = "Failed to connect node to cluster";
private static final Logger logger = LoggerFactory.getLogger(StandardFlowService.class);
public static StandardFlowService createStandaloneInstance(
final FlowController controller,
final NiFiProperties nifiProperties,
final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
return new StandardFlowService(controller, nifiProperties, null, encryptor, false, null, revisionManager, authorizer);
}
public static StandardFlowService createClusteredInstance(
final FlowController controller,
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
final ClusterCoordinator coordinator,
final PropertyEncryptor encryptor,
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
return new StandardFlowService(controller, nifiProperties, senderListener, encryptor, true, coordinator, revisionManager, authorizer);
}
private StandardFlowService(
final FlowController controller,
final NiFiProperties nifiProperties,
final NodeProtocolSenderListener senderListener,
final PropertyEncryptor encryptor,
final boolean configuredForClustering,
final ClusterCoordinator clusterCoordinator,
final RevisionManager revisionManager,
final Authorizer authorizer) throws IOException {
this.nifiProperties = nifiProperties;
this.controller = controller;
flowXml = Paths.get(nifiProperties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = nifiProperties.getAutoResumeState();
dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor, nifiProperties, controller.getExtensionManager());
this.clusterCoordinator = clusterCoordinator;
if (clusterCoordinator != null) {
clusterCoordinator.setFlowService(this);
}
this.revisionManager = revisionManager;
this.authorizer = authorizer;
if (configuredForClustering) {
this.configuredForClustering = configuredForClustering;
this.senderListener = senderListener;
senderListener.addHandler(this);
final InetSocketAddress nodeApiAddress = nifiProperties.getNodeApiAddress();
final InetSocketAddress nodeSocketAddress = nifiProperties.getClusterNodeProtocolAddress();
final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
String nodeUuid = null;
final StateManager stateManager = controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG);
if (stateManager != null) {
nodeUuid = stateManager.getState(Scope.LOCAL).get(NODE_UUID);
}
if (nodeUuid == null) {
nodeUuid = UUID.randomUUID().toString();
}
// use a random UUID as the proposed node identifier
this.nodeId = new NodeIdentifier(nodeUuid,
nodeApiAddress.getHostName(), nodeApiAddress.getPort(),
nodeSocketAddress.getHostName(), nodeSocketAddress.getPort(),
loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(),
nifiProperties.getRemoteInputHost(), nifiProperties.getRemoteInputPort(),
nifiProperties.getRemoteInputHttpPort(), nifiProperties.isSiteToSiteSecure());
} else {
this.configuredForClustering = false;
this.senderListener = null;
}
}
@Override
public void saveFlowChanges() throws IOException {
writeLock.lock();
try {
dao.save(controller);
} finally {
writeLock.unlock();
}
}
@Override
public void saveFlowChanges(final TimeUnit delayUnit, final long delay) {
final boolean archiveEnabled = nifiProperties.isFlowConfigurationArchiveEnabled();
saveFlowChanges(delayUnit, delay, archiveEnabled);
}
@Override
public void saveFlowChanges(final TimeUnit delayUnit, final long delay, final boolean archive) {
final Calendar saveTime = Calendar.getInstance();
final long delayInMs = TimeUnit.MILLISECONDS.convert(delay, delayUnit);
int finalDelayMs = 500; //default to 500 ms.
if (delayInMs <= Integer.MAX_VALUE) {
finalDelayMs = (int) delayInMs;
}
saveTime.add(Calendar.MILLISECOND, finalDelayMs);
if (logger.isTraceEnabled()) {
logger.trace(" A request to save the flow has been made with delay {} for time {}", finalDelayMs, saveTime.getTime());
}
saveHolder.set(new SaveHolder(saveTime, archive));
}
@Override
public boolean isRunning() {
return running.get();
}
@Override
public void start() throws LifeCycleStartException {
writeLock.lock();
try {
if (isRunning()) {
return;
}
running.set(true);
final ScheduledExecutorService newExecutor = new FlowEngine(2, "Flow Service Tasks");
newExecutor.scheduleWithFixedDelay(new SaveReportingTask(), 0L, 500L, TimeUnit.MILLISECONDS);
this.executor.set(newExecutor);
if (configuredForClustering) {
senderListener.start();
}
} catch (final IOException ioe) {
try {
stop(/* force */true);
} catch (final Exception e) {
}
throw new LifeCycleStartException("Failed to start Flow Service due to: " + ioe, ioe);
} finally {
writeLock.unlock();
}
}
@Override
public void stop(final boolean force) {
writeLock.lock();
try {
if (!isRunning()) {
return;
}
running.set(false);
if (clusterCoordinator != null) {
final Thread shutdownClusterCoordinator = new Thread(clusterCoordinator::shutdown);
shutdownClusterCoordinator.setDaemon(true);
shutdownClusterCoordinator.setName("Shutdown Cluster Coordinator");
shutdownClusterCoordinator.start();
}
if (!controller.isTerminated()) {
controller.shutdown(force);
}
if (configuredForClustering && senderListener != null) {
try {
senderListener.stop();
} catch (final IOException ioe) {
logger.warn("Protocol sender/listener did not stop gracefully due to: " + ioe);
}
}
final ScheduledExecutorService executorService = executor.get();
if (executorService != null) {
if (force) {
executorService.shutdownNow();
} else {
executorService.shutdown();
}
boolean graceful;
try {
graceful = executorService.awaitTermination(gracefulShutdownSeconds, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
graceful = false;
}
if (!graceful) {
logger.warn("Scheduling service did not gracefully shutdown within configured " + gracefulShutdownSeconds + " second window");
}
}
} finally {
writeLock.unlock();
}
}
@Override
public boolean canHandle(final ProtocolMessage msg) {
switch (msg.getType()) {
case RECONNECTION_REQUEST:
case OFFLOAD_REQUEST:
case DISCONNECTION_REQUEST:
case FLOW_REQUEST:
return true;
default:
return false;
}
}
@Override
public ProtocolMessage handle(final ProtocolMessage request, final Set<String> nodeIdentities) throws ProtocolException {
final long startNanos = System.nanoTime();
try {
switch (request.getType()) {
case FLOW_REQUEST:
return handleFlowRequest((FlowRequestMessage) request);
case RECONNECTION_REQUEST: {
// Suspend heartbeats until we've reconnected. Otherwise,
// we may send a heartbeat while we are still in the process of
// connecting, which will cause the Cluster Manager to mark us
// as "Connected," which becomes problematic as the FlowController's lock
// may still be held, causing this node to take a long time to respond to requests.
controller.suspendHeartbeats();
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
handleReconnectionRequest((ReconnectionRequestMessage) request);
}
}, "Reconnect to Cluster");
t.setDaemon(true);
t.start();
return new ReconnectionResponseMessage();
}
case OFFLOAD_REQUEST: {
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
handleOffloadRequest((OffloadMessage) request);
} catch (InterruptedException e) {
throw new ProtocolException("Could not complete offload request", e);
}
}
}, "Offload Flow Files from Node");
t.setDaemon(true);
t.start();
return null;
}
case DISCONNECTION_REQUEST: {
final Thread t = new Thread(new Runnable() {
@Override
public void run() {
handleDisconnectionRequest((DisconnectMessage) request);
}
}, "Disconnect from Cluster");
t.setDaemon(true);
t.start();
return null;
}
default:
throw new ProtocolException("Handler cannot handle message type: " + request.getType());
}
} finally {
if (logger.isDebugEnabled()) {
final long procNanos = System.nanoTime() - startNanos;
final long procMillis = TimeUnit.MILLISECONDS.convert(procNanos, TimeUnit.NANOSECONDS);
logger.debug("Finished Processing Protocol Message of type {} in {} millis", request.getType(), procMillis);
}
}
}
@Override
public void load(final DataFlow dataFlow) throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
if (configuredForClustering) {
// Create the initial flow from disk if it exists, or from serializing the empty root group in flow controller
final DataFlow initialFlow = (dataFlow == null) ? createDataFlow() : dataFlow;
if (logger.isTraceEnabled()) {
logger.trace("InitialFlow = " + new String(initialFlow.getFlow(), StandardCharsets.UTF_8));
}
// Sync the initial flow into the flow controller so that if the flow came from disk we loaded the
// whole flow into the flow controller and applied any bundle upgrades
writeLock.lock();
try {
loadFromBytes(initialFlow, true);
} finally {
writeLock.unlock();
}
// Get the proposed flow by serializing the flow controller which now has the synced version from above
final DataFlow proposedFlow = createDataFlowFromController();
if (logger.isTraceEnabled()) {
logger.trace("ProposedFlow = " + new String(proposedFlow.getFlow(), StandardCharsets.UTF_8));
}
/*
* Attempt to connect to the cluster. If the manager is able to
* provide a data flow, then the manager will send a connection
* response. If the manager was unable to be located, then
* the response will be null and we should load the local dataflow
* and heartbeat until a manager is located.
*/
final boolean localFlowEmpty = StandardFlowSynchronizer.isEmpty(proposedFlow);
final ConnectionResponse response = connect(true, localFlowEmpty, proposedFlow);
// obtain write lock while we are updating the controller. We need to ensure that we don't
// obtain the lock before calling connect(), though, or we will end up getting a deadlock
// because the node that is receiving the connection request won't be able to get the current
// flow, as that requires a read lock.
writeLock.lock();
try {
if (response == null || response.shouldTryLater()) {
logger.info("Flow controller will load local dataflow and suspend connection handshake until a cluster connection response is received.");
// set node ID on controller before we start heartbeating because heartbeat needs node ID
controller.setNodeId(nodeId);
clusterCoordinator.setLocalNodeIdentifier(nodeId);
// set node as clustered, since it is trying to connect to a cluster
controller.setClustered(true, null);
clusterCoordinator.setConnected(false);
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED));
/*
* Start heartbeating. Heartbeats will fail because we can't reach
* the manager, but when we locate the manager, the node will
* reconnect and establish a connection to the cluster. The
* heartbeat is the trigger that will cause the manager to
* issue a reconnect request.
*/
controller.startHeartbeating();
// Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
initializeController();
// notify controller that flow is initialized
try {
controller.onFlowInitialized(autoResumeState);
} catch (final Exception ex) {
logger.warn("Unable to start all processors due to invalid flow configuration.");
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, ex);
}
}
} else {
try {
loadFromConnectionResponse(response);
} catch (final Exception e) {
logger.error("Failed to load flow from cluster due to: " + e, e);
handleConnectionFailure(e);
throw new IOException(e);
}
}
// save the flow in the controller so we write out the latest flow with any updated bundles to disk
dao.save(controller, true);
} finally {
writeLock.unlock();
}
} else {
writeLock.lock();
try {
// operating in standalone mode, so load proposed flow and initialize the controller
loadFromBytes(dataFlow, true);
initializeController();
dao.save(controller, true);
} finally {
writeLock.unlock();
}
}
}
private void handleConnectionFailure(final Exception ex) {
DisconnectionCode disconnectionCode;
if (ex instanceof UninheritableFlowException) {
disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
} else if (ex instanceof MissingBundleException) {
disconnectionCode = DisconnectionCode.MISSING_BUNDLE;
} else if (ex instanceof FlowSynchronizationException) {
disconnectionCode = DisconnectionCode.MISMATCHED_FLOWS;
} else {
disconnectionCode = DisconnectionCode.STARTUP_FAILURE;
}
clusterCoordinator.disconnectionRequestedByNode(getNodeId(), disconnectionCode, ex.toString());
controller.setClustered(false, null);
clusterCoordinator.setConnected(false);
}
private FlowResponseMessage handleFlowRequest(final FlowRequestMessage request) throws ProtocolException {
readLock.lock();
try {
logger.info("Received flow request message from cluster coordinator.");
// create the response
final FlowResponseMessage response = new FlowResponseMessage();
response.setDataFlow(createDataFlowFromController());
return response;
} catch (final Exception ex) {
throw new ProtocolException("Failed serializing flow controller state for flow request due to: " + ex, ex);
} finally {
readLock.unlock();
}
}
private byte[] getAuthorizerFingerprint() {
final boolean isInternalAuthorizer = AuthorizerCapabilityDetection.isManagedAuthorizer(authorizer);
return isInternalAuthorizer ? ((ManagedAuthorizer) authorizer).getFingerprint().getBytes(StandardCharsets.UTF_8) : null;
}
@Override
public StandardDataFlow createDataFlow() throws IOException {
// Load the flow from disk if the file exists.
if (dao.isFlowPresent()) {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.load(baos);
final byte[] bytes = baos.toByteArray();
final byte[] snippetBytes = controller.getSnippetManager().export();
final byte[] authorizerFingerprint = getAuthorizerFingerprint();
final StandardDataFlow fromDisk = new StandardDataFlow(bytes, snippetBytes, authorizerFingerprint, new HashSet<>());
return fromDisk;
}
// Flow from disk does not exist, so serialize the Flow Controller and use that.
// This is done because on startup, if there is no flow, the Flow Controller
// will automatically create a Root Process Group, and we need to ensure that
// we replicate that Process Group to all nodes in the cluster, so that they all
// end up with the same ID for the root Process Group.
return createDataFlowFromController();
}
@Override
public StandardDataFlow createDataFlowFromController() throws IOException {
final byte[] snippetBytes = controller.getSnippetManager().export();
final byte[] authorizerFingerprint = getAuthorizerFingerprint();
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
dao.save(controller, baos);
final byte[] flowBytes = baos.toByteArray();
baos.reset();
final FlowManager flowManager = controller.getFlowManager();
final Set<String> missingComponents = new HashSet<>();
flowManager.getRootGroup().findAllProcessors().stream().filter(AbstractComponentNode::isExtensionMissing).forEach(p -> missingComponents.add(p.getIdentifier()));
flowManager.getAllControllerServices().stream().filter(ComponentNode::isExtensionMissing).forEach(cs -> missingComponents.add(cs.getIdentifier()));
controller.getAllReportingTasks().stream().filter(ComponentNode::isExtensionMissing).forEach(r -> missingComponents.add(r.getIdentifier()));
return new StandardDataFlow(flowBytes, snippetBytes, authorizerFingerprint, missingComponents);
}
private NodeIdentifier getNodeId() {
readLock.lock();
try {
return nodeId;
} finally {
readLock.unlock();
}
}
private void handleReconnectionRequest(final ReconnectionRequestMessage request) {
try {
logger.info("Processing reconnection request from cluster coordinator.");
// We are no longer connected to the cluster. But the intent is to reconnect to the cluster.
// So we don't want to call FlowController.setClustered(false, null).
// It is important, though, that we perform certain tasks, such as un-registering the node as Cluster Coordinator/Primary Node
if (controller.isConnected()) {
controller.onClusterDisconnect();
}
// reconnect
ConnectionResponse connectionResponse = new ConnectionResponse(getNodeId(), request.getDataFlow(),
request.getInstanceId(), request.getNodeConnectionStatuses(), request.getComponentRevisions());
if (connectionResponse.getDataFlow() == null) {
logger.info("Received a Reconnection Request that contained no DataFlow. Will attempt to connect to cluster using local flow.");
connectionResponse = connect(false, false, createDataFlowFromController());
}
if (connectionResponse == null) {
// If we could not communicate with the cluster, just log a warning and return.
// If the node is currently in a CONNECTING state, it will continue to heartbeat, and that will continue to
// result in attempting to connect to the cluster.
logger.warn("Received a Reconnection Request that contained no DataFlow, and was unable to communicate with an active Cluster Coordinator. Cannot connect to cluster at this time.");
controller.resumeHeartbeats();
return;
}
loadFromConnectionResponse(connectionResponse);
clusterCoordinator.resetNodeStatuses(connectionResponse.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
// reconnected, this node needs to explicitly write the inherited flow to disk, and resume heartbeats
saveFlowChanges();
controller.onClusterConnect();
logger.info("Node reconnected.");
} catch (final Exception ex) {
// disconnect controller
if (controller.isClustered()) {
disconnect("Failed to properly handle Reconnection request due to " + ex.toString());
}
logger.error("Handling reconnection request failed due to: " + ex, ex);
handleConnectionFailure(ex);
}
}
private void handleOffloadRequest(final OffloadMessage request) throws InterruptedException {
logger.info("Received offload request message from cluster coordinator with explanation: " + request.getExplanation());
offload(request.getExplanation());
}
private void offload(final String explanation) throws InterruptedException {
writeLock.lock();
try {
logger.info("Offloading node due to " + explanation);
// mark node as offloading
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADING, OffloadCode.OFFLOADED, explanation));
final FlowManager flowManager = controller.getFlowManager();
// request to stop all processors on node
flowManager.getRootGroup().stopProcessing();
// terminate all processors
flowManager.getRootGroup().findAllProcessors()
// filter stream, only stopped processors can be terminated
.stream().filter(pn -> pn.getScheduledState() == ScheduledState.STOPPED)
.forEach(pn -> pn.getProcessGroup().terminateProcessor(pn));
// request to stop all remote process groups
flowManager.getRootGroup().findAllRemoteProcessGroups()
.stream().filter(rpg -> rpg.isTransmitting())
.forEach(RemoteProcessGroup::stopTransmitting);
// offload all queues on node
final Set<Connection> connections = flowManager.findAllConnections();
for (final Connection connection : connections) {
connection.getFlowFileQueue().offloadQueue();
}
final EventAccess eventAccess = controller.getEventAccess();
ProcessGroupStatus controllerStatus;
// wait for rebalance of flowfiles on all queues
while (true) {
controllerStatus = eventAccess.getControllerStatus();
if (controllerStatus.getQueuedCount() <= 0) {
break;
}
logger.debug("Offloading queues on node {}, remaining queued count: {}", getNodeId(), controllerStatus.getQueuedCount());
Thread.sleep(1000);
}
// finish offload
for (final Connection connection : connections) {
connection.getFlowFileQueue().resetOffloadedQueue();
}
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.OFFLOADED, OffloadCode.OFFLOADED, explanation));
clusterCoordinator.finishNodeOffload(getNodeId());
logger.info("Node offloaded due to " + explanation);
} finally {
writeLock.unlock();
}
}
private void handleDisconnectionRequest(final DisconnectMessage request) {
logger.info("Received disconnection request message from cluster coordinator with explanation: " + request.getExplanation());
disconnect(request.getExplanation());
}
private void disconnect(final String explanation) {
writeLock.lock();
try {
logger.info("Disconnecting node due to " + explanation);
// mark node as not connected
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, DisconnectionCode.UNKNOWN, explanation));
// turn off primary flag
controller.setPrimary(false);
// stop heartbeating
controller.stopHeartbeating();
// set node to not clustered
controller.setClustered(false, null);
clusterCoordinator.setConnected(false);
logger.info("Node disconnected due to " + explanation);
} finally {
writeLock.unlock();
}
}
// write lock must already be acquired
private void loadFromBytes(final DataFlow proposedFlow, final boolean allowEmptyFlow)
throws IOException, FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
logger.trace("Loading flow from bytes");
// resolve the given flow (null means load flow from disk)
final DataFlow actualProposedFlow;
final byte[] flowBytes;
final byte[] authorizerFingerprint;
final Set<String> missingComponents;
if (proposedFlow == null) {
final ByteArrayOutputStream flowOnDisk = new ByteArrayOutputStream();
copyCurrentFlow(flowOnDisk);
flowBytes = flowOnDisk.toByteArray();
authorizerFingerprint = getAuthorizerFingerprint();
missingComponents = new HashSet<>();
logger.debug("Loaded Flow from bytes");
} else {
flowBytes = proposedFlow.getFlow();
authorizerFingerprint = proposedFlow.getAuthorizerFingerprint();
missingComponents = proposedFlow.getMissingComponents();
logger.debug("Loaded flow from proposed flow");
}
actualProposedFlow = new StandardDataFlow(flowBytes, null, authorizerFingerprint, missingComponents);
// load the flow
logger.debug("Loading proposed flow into FlowController");
dao.load(controller, actualProposedFlow, this);
final ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
if (rootGroup.isEmpty() && !allowEmptyFlow) {
throw new FlowSynchronizationException("Failed to load flow because unable to connect to cluster and local flow is empty");
}
final List<Template> templates = loadTemplates();
for (final Template template : templates) {
final Template existing = rootGroup.getTemplate(template.getIdentifier());
if (existing == null) {
logger.info("Imported Template '{}' to Root Group", template.getDetails().getName());
rootGroup.addTemplate(template);
} else {
logger.info("Template '{}' was already present in Root Group so will not import from file", template.getDetails().getName());
}
}
}
/**
* In NiFi 0.x, templates were stored in a templates directory as separate
* files. They are now stored in the flow itself. If there already are
* templates in that directory, though, we want to restore them.
*
* @return the templates found in the templates directory
* @throws IOException if unable to read from the file system
*/
public List<Template> loadTemplates() throws IOException {
final Path templatePath = nifiProperties.getTemplateDirectory();
final File[] files = templatePath.toFile().listFiles(pathname -> {
final String lowerName = pathname.getName().toLowerCase();
return lowerName.endsWith(".template") || lowerName.endsWith(".xml");
});
if (files == null) {
return Collections.emptyList();
}
final List<Template> templates = new ArrayList<>();
for (final File file : files) {
try (final FileInputStream fis = new FileInputStream(file);
final BufferedInputStream bis = new BufferedInputStream(fis)) {
final TemplateDTO templateDto;
try {
templateDto = TemplateDeserializer.deserialize(bis);
} catch (final Exception e) {
logger.error("Unable to interpret " + file + " as a Template. Skipping file.");
continue;
}
if (templateDto.getId() == null) {
// If there is no ID assigned, we need to assign one. We do this by generating
// an ID from the name. This is because we know that Template Names are unique
// and are consistent across all nodes in the cluster.
final String uuid = UUID.nameUUIDFromBytes(templateDto.getName().getBytes(StandardCharsets.UTF_8)).toString();
templateDto.setId(uuid);
}
final Template template = new Template(templateDto);
templates.add(template);
}
}
return templates;
}
private ConnectionResponse connect(final boolean retryOnCommsFailure, final boolean retryIndefinitely, final DataFlow dataFlow) throws ConnectionException {
readLock.lock();
try {
logger.info("Connecting Node: " + nodeId);
// create connection request message
final ConnectionRequest request = new ConnectionRequest(nodeId, dataFlow);
final ConnectionRequestMessage requestMsg = new ConnectionRequestMessage();
requestMsg.setConnectionRequest(request);
// send connection request to cluster manager
/*
* Try to get a current copy of the cluster's dataflow from the manager
* for ten times, sleeping between attempts. Ten times should be
* enough because the manager will register the node as connecting
* and therefore, no other changes to the cluster flow can occur.
*
* However, the manager needs to obtain a current data flow within
* maxAttempts * tryLaterSeconds or else the node will fail to startup.
*/
final int maxAttempts = 10;
ConnectionResponse response = null;
for (int i = 0; i < maxAttempts || retryIndefinitely; i++) {
try {
// Upon NiFi startup, the node will register for the Cluster Coordinator role with the Leader Election Manager.
// Sometimes the node will register as an active participant, meaning that it wants to be elected. This happens when the entire cluster starts up,
// for example. (This is determined by checking whether or not there already is a Cluster Coordinator registered).
// Other times, it registers as a 'silent' member, meaning that it will not be elected.
// If the leader election timeout is long (say 30 or 60 seconds), it is possible that this node was the Leader and was then restarted,
// and upon restart found that itself was already registered as the Cluster Coordinator. As a result, it registers as a Silent member of the
// election, and then connects to itself as the Cluster Coordinator. At this point, since the node has just restarted, it doesn't know about
// any of the nodes in the cluster. As a result, it will get the Cluster Topology from itself, and think there are no other nodes in the cluster.
// This causes all other nodes to send in their heartbeats, which then results in them being disconnected because they were previously unknown and
// as a result asked to reconnect to the cluster.
//
// To avoid this, we do not allow the node to connect to itself if it's not an active participant. This means that when the entire cluster is started
// up, the node can still connect to itself because it will be an active participant. But if it is then restarted, it won't be allowed to connect
// to itself. It will instead have to wait until another node is elected Cluster Coordinator.
final boolean activeCoordinatorParticipant = controller.getLeaderElectionManager().isActiveParticipant(ClusterRoles.CLUSTER_COORDINATOR);
response = senderListener.requestConnection(requestMsg, activeCoordinatorParticipant).getConnectionResponse();
if (response.shouldTryLater()) {
logger.info("Requested by cluster coordinator to retry connection in " + response.getTryLaterSeconds() + " seconds with explanation: " + response.getRejectionReason());
try {
Thread.sleep(response.getTryLaterSeconds() * 1000);
} catch (final InterruptedException ie) {
// we were interrupted, so finish quickly
Thread.currentThread().interrupt();
break;
}
} else if (response.getRejectionReason() != null) {
logger.warn("Connection request was blocked by cluster coordinator with the explanation: " + response.getRejectionReason());
// set response to null and treat a firewall blockage the same as getting no response from cluster coordinator
response = null;
break;
} else {
logger.info("Received successful response from Cluster Coordinator to Connection Request");
// we received a successful connection response from cluster coordinator
break;
}
} catch (final NoClusterCoordinatorException ncce) {
logger.warn("There is currently no Cluster Coordinator. This often happens upon restart of NiFi when running an embedded ZooKeeper. Will register this node "
+ "to become the active Cluster Coordinator and will attempt to connect to cluster again");
controller.registerForClusterCoordinator(true);
try {
Thread.sleep(1000L);
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} catch (final Exception pe) {
// could not create a socket and communicate with manager
logger.warn("Failed to connect to cluster due to: " + pe);
if (logger.isDebugEnabled()) {
logger.warn("", pe);
}
if (retryOnCommsFailure) {
try {
Thread.sleep(response == null ? 5000 : response.getTryLaterSeconds());
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
} else {
break;
}
}
}
if (response == null) {
// if response is null, then either we had IO problems or we were blocked by firewall or we couldn't determine manager's address
return response;
} else if (response.shouldTryLater()) {
// if response indicates we should try later, then coordinator was unable to service our request. Just load local flow and move on.
// when the cluster coordinator is able to service requests, this node's heartbeat will trigger the cluster coordinator to reach
// out to this node and re-connect to the cluster.
logger.info("Received a 'try again' response from Cluster Coordinator when attempting to connect to cluster with explanation '"
+ response.getRejectionReason() + "'. However, the maximum number of retries have already completed. Will load local flow and connect to the cluster when able.");
return null;
} else {
// cluster manager provided a successful response with a current dataflow
// persist node uuid and index returned by NCM and return the response to the caller
try {
// Ensure that we have registered our 'cluster node configuration' state key
final Map<String, String> map = Collections.singletonMap(NODE_UUID, response.getNodeIdentifier().getId());
controller.getStateManagerProvider().getStateManager(CLUSTER_NODE_CONFIG).setState(map, Scope.LOCAL);
} catch (final IOException ioe) {
logger.warn("Received successful response from Cluster Manager but failed to persist state about the Node's Unique Identifier and the Node's Index. "
+ "This node may be assigned a different UUID when the node is restarted.", ioe);
}
return response;
}
} finally {
readLock.unlock();
}
}
private void loadFromConnectionResponse(final ConnectionResponse response) throws ConnectionException {
writeLock.lock();
try {
if (response.getNodeConnectionStatuses() != null) {
clusterCoordinator.resetNodeStatuses(response.getNodeConnectionStatuses().stream()
.collect(Collectors.toMap(NodeConnectionStatus::getNodeIdentifier, status -> status)));
}
// get the dataflow from the response
final DataFlow dataFlow = response.getDataFlow();
if (logger.isTraceEnabled()) {
logger.trace("ResponseFlow = " + new String(dataFlow.getFlow(), StandardCharsets.UTF_8));
}
logger.info("Setting Flow Controller's Node ID: " + nodeId);
nodeId = response.getNodeIdentifier();
controller.setNodeId(nodeId);
// load new controller state
loadFromBytes(dataFlow, true);
// set node ID on controller before we start heartbeating because heartbeat needs node ID
clusterCoordinator.setLocalNodeIdentifier(nodeId);
clusterCoordinator.setConnected(true);
final ComponentRevisionSnapshot componentRevisionSnapshot = response.getComponentRevisions();
final RevisionSnapshot revisionSnapshot = componentRevisionSnapshot.toRevisionSnapshot();
revisionManager.reset(revisionSnapshot);
// mark the node as clustered
controller.setClustered(true, response.getInstanceId());
controller.setConnectionStatus(new NodeConnectionStatus(nodeId, NodeConnectionState.CONNECTED));
// Initialize the controller after the flow is loaded so we don't take any actions on repos until everything is good
initializeController();
// start the processors as indicated by the dataflow
controller.onFlowInitialized(autoResumeState);
loadSnippets(dataFlow.getSnippets());
controller.startHeartbeating();
} catch (final UninheritableFlowException ufe) {
throw new UninheritableFlowException(CONNECTION_EXCEPTION_MSG_PREFIX, ufe);
} catch (final MissingBundleException mbe) {
throw new MissingBundleException(CONNECTION_EXCEPTION_MSG_PREFIX + " because cluster flow contains bundles that do not exist on the current node", mbe);
} catch (final FlowSerializationException fse) {
throw new ConnectionException(CONNECTION_EXCEPTION_MSG_PREFIX + " because local or cluster flow is malformed.", fse);
} catch (final FlowSynchronizationException fse) {
throw new FlowSynchronizationException(CONNECTION_EXCEPTION_MSG_PREFIX + " because local flow controller partially updated. "
+ "Administrator should disconnect node and review flow for corruption.", fse);
} catch (final Exception ex) {
throw new ConnectionException("Failed to connect node to cluster due to: " + ex, ex);
} finally {
writeLock.unlock();
}
}
private void initializeController() throws IOException {
if (firstControllerInitialization) {
logger.debug("First controller initialization, initializing controller...");
controller.initializeFlow();
firstControllerInitialization = false;
}
}
@Override
public void copyCurrentFlow(final OutputStream os) throws IOException {
readLock.lock();
try {
if (!Files.exists(flowXml) || Files.size(flowXml) == 0) {
return;
}
try (final InputStream in = Files.newInputStream(flowXml, StandardOpenOption.READ);
final InputStream gzipIn = new GZIPInputStream(in)) {
FileUtils.copy(gzipIn, os);
}
} finally {
readLock.unlock();
}
}
@Override
public void copyCurrentFlow(final File file) throws IOException {
try (final OutputStream fos = new FileOutputStream(file);
final OutputStream gzipOut = new GZIPOutputStream(fos, 1)) {
copyCurrentFlow(gzipOut);
}
}
public void loadSnippets(final byte[] bytes) {
if (bytes.length == 0) {
return;
}
final SnippetManager snippetManager = controller.getSnippetManager();
snippetManager.clear();
for (final StandardSnippet snippet : SnippetManager.parseBytes(bytes)) {
snippetManager.addSnippet(snippet);
}
}
private class SaveReportingTask implements Runnable {
@Override
public void run() {
ClassLoader currentCl = null;
final Bundle frameworkBundle = NarClassLoadersHolder.getInstance().getFrameworkBundle();
if (frameworkBundle != null) {
currentCl = Thread.currentThread().getContextClassLoader();
final ClassLoader cl = frameworkBundle.getClassLoader();
Thread.currentThread().setContextClassLoader(cl);
}
try {
//Hang onto the SaveHolder here rather than setting it to null because if the save fails we will try again
final SaveHolder holder = StandardFlowService.this.saveHolder.get();
if (holder == null) {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Save request time {} // Current time {}", holder.saveTime.getTime(), new Date());
}
final Calendar now = Calendar.getInstance();
if (holder.saveTime.before(now)) {
if (logger.isTraceEnabled()) {
logger.trace("Waiting for write lock and then will save");
}
writeLock.lock();
try {
dao.save(controller, holder.shouldArchive);
// Nulling it out if it is still set to our current SaveHolder. Otherwise leave it alone because it means
// another save is already pending.
final boolean noSavePending = StandardFlowService.this.saveHolder.compareAndSet(holder, null);
logger.info("Saved flow controller {} // Another save pending = {}", controller, !noSavePending);
} finally {
writeLock.unlock();
}
}
} catch (final Throwable t) {
logger.error("Unable to save flow controller configuration due to: " + t, t);
if (logger.isDebugEnabled()) {
logger.error("", t);
}
// record the failed save as a bulletin
final Bulletin saveFailureBulletin = BulletinFactory.createBulletin(EVENT_CATEGORY, LogLevel.ERROR.name(), "Unable to save flow controller configuration.");
controller.getBulletinRepository().addBulletin(saveFailureBulletin);
} finally {
if (currentCl != null) {
Thread.currentThread().setContextClassLoader(currentCl);
}
}
}
}
private class SaveHolder {
private final Calendar saveTime;
private final boolean shouldArchive;
private SaveHolder(final Calendar moment, final boolean archive) {
saveTime = moment;
shouldArchive = archive;
}
}
}