blob: c88588c74a671ce29252fa3a95a009cf13b083ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.Resource;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.coordination.node.ClusterRoles;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.components.monitor.LongRunningTaskMonitor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.StandardValidationTrigger;
import org.apache.nifi.components.validation.TriggerValidationTask;
import org.apache.nifi.components.validation.ValidationStatus;
import org.apache.nifi.components.validation.ValidationTrigger;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.StandardConnection;
import org.apache.nifi.controller.cluster.ClusterProtocolHeartbeater;
import org.apache.nifi.controller.cluster.Heartbeater;
import org.apache.nifi.controller.exception.CommunicationsException;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.flow.StandardFlowManager;
import org.apache.nifi.controller.kerberos.KerberosConfig;
import org.apache.nifi.controller.leader.election.LeaderElectionManager;
import org.apache.nifi.controller.leader.election.LeaderElectionStateChangeListener;
import org.apache.nifi.controller.queue.ConnectionEventListener;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueueFactory;
import org.apache.nifi.controller.queue.LoadBalanceStrategy;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.queue.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.ContentRepositoryFlowFileAccess;
import org.apache.nifi.controller.queue.clustered.SocketLoadBalancedFlowFileQueue;
import org.apache.nifi.controller.queue.clustered.client.StandardLoadBalanceFlowFileCodec;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientFactory;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientRegistry;
import org.apache.nifi.controller.queue.clustered.client.async.nio.NioAsyncLoadBalanceClientTask;
import org.apache.nifi.controller.queue.clustered.server.ClusterLoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.ConnectionLoadBalanceServer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceAuthorizer;
import org.apache.nifi.controller.queue.clustered.server.LoadBalanceProtocol;
import org.apache.nifi.controller.queue.clustered.server.StandardLoadBalanceProtocol;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
import org.apache.nifi.controller.repository.FlowFileEventRepository;
import org.apache.nifi.controller.repository.FlowFileRecord;
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.FlowFileSwapManager;
import org.apache.nifi.controller.repository.QueueProvider;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.StandardCounterRepository;
import org.apache.nifi.controller.repository.StandardFlowFileRecord;
import org.apache.nifi.controller.repository.StandardQueueProvider;
import org.apache.nifi.controller.repository.StandardRepositoryRecord;
import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
import org.apache.nifi.controller.repository.SwapSummary;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ContentDirection;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.RepositoryContextFactory;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.serialization.FlowSerializationException;
import org.apache.nifi.controller.serialization.FlowSerializer;
import org.apache.nifi.controller.serialization.FlowSynchronizationException;
import org.apache.nifi.controller.serialization.FlowSynchronizer;
import org.apache.nifi.controller.serialization.ScheduledStateLookup;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.StorageStatus;
import org.apache.nifi.controller.status.analytics.CachingConnectionStatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.ConnectionStatusAnalytics;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsModelMapFactory;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StandardGarbageCollectionStatus;
import org.apache.nifi.controller.status.history.StatusHistoryRepository;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.tasks.ExpireFlowFiles;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.diagnostics.SystemDiagnosticsFactory;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.encrypt.SensitiveValueEncoder;
import org.apache.nifi.encrypt.StandardSensitiveValueEncoder;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.nar.ExtensionDefinition;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterLookup;
import org.apache.nifi.parameter.StandardParameterContextManager;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.provenance.ComponentIdentifierLookup;
import org.apache.nifi.provenance.IdentifierLookup;
import org.apache.nifi.provenance.ProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.ProvenanceRepository;
import org.apache.nifi.provenance.StandardProvenanceAuthorizableFactory;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedProcessGroup;
import org.apache.nifi.registry.variable.MutableVariableRegistry;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
import org.apache.nifi.remote.SocketRemoteSiteListener;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.reporting.StandardEventAccess;
import org.apache.nifi.reporting.UserAwareEventAccess;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.security.util.StandardTlsConfiguration;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.services.FlowService;
import org.apache.nifi.stream.io.LimitingInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.ComponentIdGenerator;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.util.concurrency.TimedLock;
import org.apache.nifi.web.api.dto.PositionDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.revision.RevisionManager;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.NotificationEmitter;
import javax.net.ssl.SSLContext;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
public class FlowController implements ReportingTaskProvider, Authorizable, NodeTypeProvider {
// default repository implementations
public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
public static final String DEFAULT_CONTENT_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.FileSystemRepository";
public static final String DEFAULT_PROVENANCE_REPO_IMPLEMENTATION = "org.apache.nifi.provenance.VolatileProvenanceRepository";
public static final String DEFAULT_SWAP_MANAGER_IMPLEMENTATION = "org.apache.nifi.controller.FileSystemSwapManager";
public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
public static final String GRACEFUL_SHUTDOWN_PERIOD = "nifi.flowcontroller.graceful.shutdown.seconds";
public static final long DEFAULT_GRACEFUL_SHUTDOWN_SECONDS = 10;
// default properties for scaling the positions of components from pre-1.0 flow encoding versions.
public static final double DEFAULT_POSITION_SCALE_FACTOR_X = 1.5;
public static final double DEFAULT_POSITION_SCALE_FACTOR_Y = 1.34;
private final AtomicInteger maxTimerDrivenThreads;
private final AtomicInteger maxEventDrivenThreads;
private final AtomicReference<FlowEngine> timerDrivenEngineRef;
private final AtomicReference<FlowEngine> eventDrivenEngineRef;
private final EventDrivenSchedulingAgent eventDrivenSchedulingAgent;
private final ContentRepository contentRepository;
private final FlowFileRepository flowFileRepository;
private final FlowFileEventRepository flowFileEventRepository;
private final ProvenanceRepository provenanceRepository;
private final BulletinRepository bulletinRepository;
private final StandardProcessScheduler processScheduler;
private final SnippetManager snippetManager;
private final long gracefulShutdownSeconds;
private final ExtensionManager extensionManager;
private final NiFiProperties nifiProperties;
private final SSLContext sslContext;
private final Set<RemoteSiteListener> externalSiteListeners = new HashSet<>();
private final AtomicReference<CounterRepository> counterRepositoryRef;
private final AtomicBoolean initialized = new AtomicBoolean(false);
private final AtomicBoolean flowSynchronized = new AtomicBoolean(false);
private final StandardControllerServiceProvider controllerServiceProvider;
private final Authorizer authorizer;
private final AuditService auditService;
private final EventDrivenWorkerQueue eventDrivenWorkerQueue;
private final StatusHistoryRepository statusHistoryRepository;
private final StateManagerProvider stateManagerProvider;
private final long systemStartTime = System.currentTimeMillis(); // time at which the node was started
private final VariableRegistry variableRegistry;
private final RevisionManager revisionManager;
private final ConnectionLoadBalanceServer loadBalanceServer;
private final NioAsyncLoadBalanceClientRegistry loadBalanceClientRegistry;
private final FlowEngine loadBalanceClientThreadPool;
private final Set<NioAsyncLoadBalanceClientTask> loadBalanceClientTasks = new HashSet<>();
private final ConcurrentMap<String, ProcessGroup> allProcessGroups = new ConcurrentHashMap<>();
private final ZooKeeperStateServer zooKeeperStateServer;
// The Heartbeat Bean is used to provide an Atomic Reference to data that is used in heartbeats that may
// change while the instance is running. We do this because we want to generate heartbeats even if we
// are unable to obtain a read lock on the entire FlowController.
private final AtomicReference<HeartbeatBean> heartbeatBeanRef = new AtomicReference<>();
private final AtomicBoolean heartbeatsSuspended = new AtomicBoolean(false);
private final Integer remoteInputSocketPort;
private final Integer remoteInputHttpPort;
private final Boolean isSiteToSiteSecure;
private final List<Connectable> startConnectablesAfterInitialization;
private final List<RemoteGroupPort> startRemoteGroupPortsAfterInitialization;
private final LeaderElectionManager leaderElectionManager;
private final ClusterCoordinator clusterCoordinator;
private final FlowRegistryClient flowRegistryClient;
private final FlowEngine validationThreadPool;
private final ValidationTrigger validationTrigger;
private final ReloadComponent reloadComponent;
private final ProvenanceAuthorizableFactory provenanceAuthorizableFactory;
private final UserAwareEventAccess eventAccess;
private final ParameterContextManager parameterContextManager;
private final StandardFlowManager flowManager;
private final RepositoryContextFactory repositoryContextFactory;
private final RingBufferGarbageCollectionLog gcLog;
private final Optional<FlowEngine> longRunningTaskMonitorThreadPool;
/**
* true if controller is configured to operate in a clustered environment
*/
private final boolean configuredForClustering;
/**
* the time to wait between heartbeats
*/
private final int heartbeatDelaySeconds;
/**
* The sensitive property string encryptor *
*/
private final PropertyEncryptor encryptor;
/**
* The sensitive value string encoder (hasher)
*/
private final SensitiveValueEncoder sensitiveValueEncoder;
private final ScheduledExecutorService clusterTaskExecutor = new FlowEngine(3, "Clustering Tasks", true);
private final ResourceClaimManager resourceClaimManager = new StandardResourceClaimManager();
// guarded by rwLock
/**
* timer to periodically send heartbeats to the cluster
*/
private ScheduledFuture<?> heartbeatSenderFuture;
private final Heartbeater heartbeater;
private final HeartbeatMonitor heartbeatMonitor;
// guarded by FlowController lock
/**
* timer task to generate heartbeats
*/
private final AtomicReference<HeartbeatSendTask> heartbeatSendTask = new AtomicReference<>(null);
// guarded by rwLock
/**
* the node identifier;
*/
private volatile NodeIdentifier nodeId;
// guarded by rwLock
/**
* true if controller is connected or trying to connect to the cluster
*/
private boolean clustered;
// guarded by rwLock
private NodeConnectionStatus connectionStatus;
private StatusAnalyticsEngine analyticsEngine;
// guarded by rwLock
private String instanceId;
private volatile boolean shutdown = false;
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final TimedLock readLock = new TimedLock(rwLock.readLock(), "FlowControllerReadLock", 1);
private final TimedLock writeLock = new TimedLock(rwLock.writeLock(), "FlowControllerWriteLock", 1);
private static final Logger LOG = LoggerFactory.getLogger(FlowController.class);
public static FlowController createStandaloneInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final Authorizer authorizer,
final AuditService auditService,
final PropertyEncryptor encryptor,
final BulletinRepository bulletinRepo,
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager) {
return new FlowController(
flowFileEventRepo,
properties,
authorizer,
auditService,
encryptor,
/* configuredForClustering */ false,
/* NodeProtocolSender */ null,
bulletinRepo,
/* cluster coordinator */ null,
/* heartbeat monitor */ null,
/* leader election manager */ null,
/* variable registry */ variableRegistry,
flowRegistryClient,
extensionManager,
null);
}
public static FlowController createClusteredInstance(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties properties,
final Authorizer authorizer,
final AuditService auditService,
final PropertyEncryptor encryptor,
final NodeProtocolSender protocolSender,
final BulletinRepository bulletinRepo,
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
final RevisionManager revisionManager) {
final FlowController flowController = new FlowController(
flowFileEventRepo,
properties,
authorizer,
auditService,
encryptor,
/* configuredForClustering */ true,
protocolSender,
bulletinRepo,
clusterCoordinator,
heartbeatMonitor,
leaderElectionManager,
variableRegistry,
flowRegistryClient,
extensionManager,
revisionManager);
return flowController;
}
@SuppressWarnings("deprecation")
private FlowController(
final FlowFileEventRepository flowFileEventRepo,
final NiFiProperties nifiProperties,
final Authorizer authorizer,
final AuditService auditService,
final PropertyEncryptor encryptor,
final boolean configuredForClustering,
final NodeProtocolSender protocolSender,
final BulletinRepository bulletinRepo,
final ClusterCoordinator clusterCoordinator,
final HeartbeatMonitor heartbeatMonitor,
final LeaderElectionManager leaderElectionManager,
final VariableRegistry variableRegistry,
final FlowRegistryClient flowRegistryClient,
final ExtensionManager extensionManager,
final RevisionManager revisionManager) {
maxTimerDrivenThreads = new AtomicInteger(10);
maxEventDrivenThreads = new AtomicInteger(1);
this.encryptor = encryptor;
this.nifiProperties = nifiProperties;
this.heartbeatMonitor = heartbeatMonitor;
this.leaderElectionManager = leaderElectionManager;
this.extensionManager = extensionManager;
this.clusterCoordinator = clusterCoordinator;
this.authorizer = authorizer;
this.auditService = auditService;
this.configuredForClustering = configuredForClustering;
this.flowRegistryClient = flowRegistryClient;
this.revisionManager = revisionManager;
try {
// Form the container object from the properties
TlsConfiguration tlsConfiguration = StandardTlsConfiguration.fromNiFiProperties(nifiProperties);
this.sslContext = SslContextFactory.createSslContext(tlsConfiguration);
} catch (TlsException e) {
LOG.error("Unable to start the flow controller because the TLS configuration was invalid: {}", e.getLocalizedMessage());
throw new IllegalStateException("Flow controller TLS configuration is invalid", e);
}
this.sensitiveValueEncoder = new StandardSensitiveValueEncoder(nifiProperties);
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, extensionManager, resourceClaimManager);
flowFileRepository = flowFileRepo;
flowFileEventRepository = flowFileEventRepo;
counterRepositoryRef = new AtomicReference<>(new StandardCounterRepository());
gcLog = new RingBufferGarbageCollectionLog(1000, 20L);
for (final GarbageCollectorMXBean mxBean : ManagementFactory.getGarbageCollectorMXBeans()) {
if (mxBean instanceof NotificationEmitter) {
((NotificationEmitter) mxBean).addNotificationListener(gcLog, null, null);
}
}
bulletinRepository = bulletinRepo;
this.variableRegistry = variableRegistry == null ? VariableRegistry.EMPTY_REGISTRY : variableRegistry;
try {
this.provenanceAuthorizableFactory = new StandardProvenanceAuthorizableFactory(this);
this.provenanceRepository = createProvenanceRepository(nifiProperties);
final IdentifierLookup identifierLookup = new ComponentIdentifierLookup(this);
this.provenanceRepository.initialize(createEventReporter(), authorizer, provenanceAuthorizableFactory, identifierLookup);
} catch (final Exception e) {
throw new RuntimeException("Unable to create Provenance Repository", e);
}
try {
this.contentRepository = createContentRepository(nifiProperties);
} catch (final Exception e) {
throw new RuntimeException("Unable to create Content Repository", e);
}
try {
this.stateManagerProvider = StandardStateManagerProvider.create(nifiProperties, this.variableRegistry, extensionManager, ParameterLookup.EMPTY);
} catch (final IOException e) {
throw new RuntimeException(e);
}
processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, encryptor, stateManagerProvider, this.nifiProperties);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
parameterContextManager = new StandardParameterContextManager();
repositoryContextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
flowManager = new StandardFlowManager(nifiProperties, sslContext, this, flowFileEventRepository, parameterContextManager);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, flowManager, extensionManager);
flowManager.initialize(controllerServiceProvider);
eventDrivenSchedulingAgent = new EventDrivenSchedulingAgent(
eventDrivenEngineRef.get(), controllerServiceProvider, stateManagerProvider, eventDrivenWorkerQueue,
repositoryContextFactory, maxEventDrivenThreads.get(), encryptor, extensionManager, this);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, eventDrivenSchedulingAgent);
final QuartzSchedulingAgent quartzSchedulingAgent = new QuartzSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor);
final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, encryptor, this.nifiProperties);
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent);
// PRIMARY_NODE_ONLY is deprecated, but still exists to handle processors that are still defined with it (they haven't been re-configured with executeNode = PRIMARY).
processScheduler.setSchedulingAgent(SchedulingStrategy.PRIMARY_NODE_ONLY, timerDrivenAgent);
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, quartzSchedulingAgent);
startConnectablesAfterInitialization = new ArrayList<>();
startRemoteGroupPortsAfterInitialization = new ArrayList<>();
final String gracefulShutdownSecondsVal = nifiProperties.getProperty(GRACEFUL_SHUTDOWN_PERIOD);
long shutdownSecs;
try {
shutdownSecs = Long.parseLong(gracefulShutdownSecondsVal);
if (shutdownSecs < 1) {
shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
}
} catch (final NumberFormatException nfe) {
shutdownSecs = DEFAULT_GRACEFUL_SHUTDOWN_SECONDS;
}
gracefulShutdownSeconds = shutdownSecs;
remoteInputSocketPort = nifiProperties.getRemoteInputPort();
remoteInputHttpPort = nifiProperties.getRemoteInputHttpPort();
isSiteToSiteSecure = nifiProperties.isSiteToSiteSecure();
if (isSiteToSiteSecure && sslContext == null && remoteInputSocketPort != null) {
throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured");
}
this.heartbeatDelaySeconds = (int) FormatUtils.getTimeDuration(nifiProperties.getNodeHeartbeatInterval(), TimeUnit.SECONDS);
this.snippetManager = new SnippetManager();
this.reloadComponent = new StandardReloadComponent(this);
final ProcessGroup rootGroup = new StandardProcessGroup(ComponentIdGenerator.generateId().toString(), controllerServiceProvider, processScheduler,
encryptor, extensionManager, stateManagerProvider, flowManager, flowRegistryClient, reloadComponent, new MutableVariableRegistry(this.variableRegistry), this,
nifiProperties);
rootGroup.setName(FlowManager.DEFAULT_ROOT_GROUP_NAME);
setRootGroup(rootGroup);
instanceId = ComponentIdGenerator.generateId().toString();
this.validationThreadPool = new FlowEngine(5, "Validate Components", true);
this.validationTrigger = new StandardValidationTrigger(validationThreadPool, this::isInitialized);
if (remoteInputSocketPort == null) {
LOG.info("Not enabling RAW Socket Site-to-Site functionality because nifi.remote.input.socket.port is not set");
} else if (isSiteToSiteSecure && sslContext == null) {
LOG.error("Unable to create Secure Site-to-Site Listener because not all required Keystore/Truststore "
+ "Properties are set. Site-to-Site functionality will be disabled until this problem is has been fixed.");
} else {
// Register the SocketFlowFileServerProtocol as the appropriate resource for site-to-site Server Protocol
RemoteResourceManager.setServerProtocolImplementation(SocketFlowFileServerProtocol.RESOURCE_NAME, SocketFlowFileServerProtocol.class);
final NodeInformant nodeInformant = configuredForClustering ? new ClusterCoordinatorNodeInformant(clusterCoordinator) : null;
externalSiteListeners.add(new SocketRemoteSiteListener(remoteInputSocketPort, isSiteToSiteSecure ? sslContext : null, nifiProperties, nodeInformant));
}
if (remoteInputHttpPort == null) {
LOG.info("Not enabling HTTP(S) Site-to-Site functionality because the '" + NiFiProperties.SITE_TO_SITE_HTTP_ENABLED + "' property is not true");
} else {
externalSiteListeners.add(HttpRemoteSiteListener.getInstance(nifiProperties));
}
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(rootGroup);
}
// Determine frequency for obtaining component status snapshots
final String snapshotFrequency = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
long snapshotMillis;
try {
snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
}
// Initialize the Embedded ZooKeeper server, if applicable
if (nifiProperties.isStartEmbeddedZooKeeper() && configuredForClustering) {
try {
zooKeeperStateServer = ZooKeeperStateServer.create(nifiProperties);
zooKeeperStateServer.start();
} catch (final IOException | ConfigException e) {
throw new IllegalStateException("Unable to initialize Flow because NiFi was configured to start an Embedded Zookeeper server but failed to do so", e);
}
} else {
zooKeeperStateServer = null;
}
statusHistoryRepository = createStatusHistoryRepository();
final boolean analyticsEnabled = Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_ENABLED, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_ENABLED));
if (analyticsEnabled) {
// Determine interval for predicting future feature values
final String predictionInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_PREDICTION_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL);
long predictionIntervalMillis;
try {
predictionIntervalMillis = FormatUtils.getTimeDuration(predictionInterval, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_PREDICTION_INTERVAL + ". This property has been set to '"
+ NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL + "'");
predictionIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_PREDICTION_INTERVAL, TimeUnit.MILLISECONDS);
}
// Determine interval for querying past observations
final String queryInterval = nifiProperties.getProperty(NiFiProperties.ANALYTICS_QUERY_INTERVAL, NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL);
long queryIntervalMillis;
try {
queryIntervalMillis = FormatUtils.getTimeDuration(queryInterval, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_QUERY_INTERVAL + ". This property has been set to '"
+ NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL + "'");
queryIntervalMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_ANALYTICS_QUERY_INTERVAL, TimeUnit.MILLISECONDS);
}
// Determine score name to use for evaluating model performance
String modelScoreName = nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_NAME, NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_NAME);
// Determine score threshold to use when evaluating acceptable model
Double modelScoreThreshold;
try {
modelScoreThreshold = Double.valueOf(nifiProperties.getProperty(NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD,
Double.toString(NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD)));
} catch (final Exception e) {
LOG.warn("Analytics is enabled however could not retrieve value for " + NiFiProperties.ANALYTICS_CONNECTION_MODEL_SCORE_THRESHOLD + ". This property has been set to '"
+ NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD + "'.");
modelScoreThreshold = NiFiProperties.DEFAULT_ANALYTICS_CONNECTION_SCORE_THRESHOLD;
}
StatusAnalyticsModelMapFactory statusAnalyticsModelMapFactory = new StatusAnalyticsModelMapFactory(extensionManager, nifiProperties);
analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusHistoryRepository, statusAnalyticsModelMapFactory,
predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
Long startTs = System.currentTimeMillis();
RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(startTs);
flowManager.findAllConnections().forEach(connection -> {
ConnectionStatusAnalytics connectionStatusAnalytics = ((ConnectionStatusAnalytics)analyticsEngine.getStatusAnalytics(connection.getIdentifier()));
connectionStatusAnalytics.refresh();
connectionStatusAnalytics.loadPredictions(statusReport);
});
Long endTs = System.currentTimeMillis();
LOG.debug("Time Elapsed for Prediction for loading all predictions: {}", endTs - startTs);
} catch (final Exception e) {
LOG.error("Failed to generate predictions", e);
}
}
}, 0L, 15, TimeUnit.SECONDS);
}
eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, auditService, analyticsEngine);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
if (configuredForClustering) {
heartbeater = new ClusterProtocolHeartbeater(protocolSender, clusterCoordinator, leaderElectionManager);
// Check if there is already a cluster coordinator elected. If not, go ahead
// and register for coordinator role. If there is already one elected, do not register until
// we have connected to the cluster. This allows us to avoid becoming the coordinator with a
// flow that is different from the rest of the cluster (especially an empty flow) and then
// kicking everyone out. This way, we instead inherit the cluster flow before we attempt to be
// the coordinator.
LOG.info("Checking if there is already a Cluster Coordinator Elected...");
final String clusterCoordinatorAddress = leaderElectionManager.getLeader(ClusterRoles.CLUSTER_COORDINATOR);
if (StringUtils.isEmpty(clusterCoordinatorAddress)) {
LOG.info("It appears that no Cluster Coordinator has been Elected yet. Registering for Cluster Coordinator Role.");
registerForClusterCoordinator(true);
} else {
// At this point, we have determined that there is a Cluster Coordinator elected. It is important to note, though,
// that if we are running an embedded ZooKeeper, and we have just restarted the cluster (at least the nodes that run the
// embedded ZooKeeper), that we could possibly determine that the Cluster Coordinator is at an address that is not really
// valid. This is because the latest stable ZooKeeper does not support "Container ZNodes" and as a result the ZNodes that
// are created are persistent, not ephemeral. Upon restart, we can get this persisted value, even though the node that belongs
// to that address has not started. ZooKeeper/Curator will recognize this after a while and delete the ZNode. As a result,
// we may later determine that there is in fact no Cluster Coordinator. If this happens, we will automatically register for
// Cluster Coordinator through the StandardFlowService.
LOG.info("The Election for Cluster Coordinator has already begun (Leader is {}). Will not register to be elected for this role until after connecting "
+ "to the cluster and inheriting the cluster's flow.", clusterCoordinatorAddress);
registerForClusterCoordinator(false);
}
leaderElectionManager.start();
heartbeatMonitor.start();
final InetSocketAddress loadBalanceAddress = nifiProperties.getClusterLoadBalanceAddress();
// Setup Load Balancing Server
final EventReporter eventReporter = createEventReporter();
final LoadBalanceAuthorizer authorizeConnection = new ClusterLoadBalanceAuthorizer(clusterCoordinator, eventReporter);
final LoadBalanceProtocol loadBalanceProtocol = new StandardLoadBalanceProtocol(flowFileRepo, contentRepository, provenanceRepository, this, authorizeConnection);
final int numThreads = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
final String timeoutPeriod = nifiProperties.getProperty(NiFiProperties.LOAD_BALANCE_COMMS_TIMEOUT, NiFiProperties.DEFAULT_LOAD_BALANCE_COMMS_TIMEOUT);
final int timeoutMillis = (int) FormatUtils.getTimeDuration(timeoutPeriod, TimeUnit.MILLISECONDS);
loadBalanceServer = new ConnectionLoadBalanceServer(loadBalanceAddress.getHostName(), loadBalanceAddress.getPort(), sslContext,
numThreads, loadBalanceProtocol, eventReporter, timeoutMillis);
final int connectionsPerNode = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_CONNECTIONS_PER_NODE, NiFiProperties.DEFAULT_LOAD_BALANCE_CONNECTIONS_PER_NODE);
final NioAsyncLoadBalanceClientFactory asyncClientFactory = new NioAsyncLoadBalanceClientFactory(sslContext, timeoutMillis, new ContentRepositoryFlowFileAccess(contentRepository),
eventReporter, new StandardLoadBalanceFlowFileCodec(), clusterCoordinator);
loadBalanceClientRegistry = new NioAsyncLoadBalanceClientRegistry(asyncClientFactory, connectionsPerNode);
final int loadBalanceClientThreadCount = nifiProperties.getIntegerProperty(NiFiProperties.LOAD_BALANCE_MAX_THREAD_COUNT, NiFiProperties.DEFAULT_LOAD_BALANCE_MAX_THREAD_COUNT);
loadBalanceClientThreadPool = new FlowEngine(loadBalanceClientThreadCount, "Load-Balanced Client", true);
for (int i = 0; i < loadBalanceClientThreadCount; i++) {
final NioAsyncLoadBalanceClientTask clientTask = new NioAsyncLoadBalanceClientTask(loadBalanceClientRegistry, clusterCoordinator, eventReporter);
loadBalanceClientTasks.add(clientTask);
loadBalanceClientThreadPool.submit(clientTask);
}
} else {
loadBalanceClientRegistry = null;
heartbeater = null;
loadBalanceServer = null;
loadBalanceClientThreadPool = null;
}
longRunningTaskMonitorThreadPool = isLongRunningTaskMonitorEnabled()
? Optional.of(new FlowEngine(1, "Long Running Task Monitor", true))
: Optional.empty();
}
@Override
public Authorizable getParentAuthorizable() {
return null;
}
@Override
public Resource getResource() {
return ResourceFactory.getControllerResource();
}
private static FlowFileRepository createFlowFileRepository(final NiFiProperties properties, final ExtensionManager extensionManager, final ResourceClaimManager contentClaimManager) {
final String implementationClassName = properties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION, DEFAULT_FLOWFILE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create FlowFile Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.FLOWFILE_REPOSITORY_IMPLEMENTATION);
}
try {
final FlowFileRepository created = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileRepository.class, properties);
synchronized (created) {
created.initialize(contentClaimManager);
}
return created;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public FlowFileSwapManager createSwapManager() {
final String implementationClassName = nifiProperties.getProperty(NiFiProperties.FLOWFILE_SWAP_MANAGER_IMPLEMENTATION, DEFAULT_SWAP_MANAGER_IMPLEMENTATION);
if (implementationClassName == null) {
return null;
}
try {
final FlowFileSwapManager swapManager = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, FlowFileSwapManager.class, nifiProperties);
final EventReporter eventReporter = createEventReporter();
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public FlowFileRepository getFlowFileRepository() {
return flowFileRepository;
}
@Override
public EventReporter getEventReporter() {
return eventReporter;
}
};
swapManager.initialize(initializationContext);
}
return swapManager;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public EventReporter createEventReporter() {
return new EventReporter() {
private static final long serialVersionUID = 1L;
@Override
public void reportEvent(final Severity severity, final String category, final String message) {
final Bulletin bulletin = BulletinFactory.createBulletin(category, severity.name(), message);
bulletinRepository.addBulletin(bulletin);
}
};
}
public void purge() {
getFlowManager().purge();
writeLock.lock();
try {
startConnectablesAfterInitialization.clear();
startRemoteGroupPortsAfterInitialization.clear();
} finally {
writeLock.unlock("purge");
}
}
public void initializeFlow() throws IOException {
initializeFlow(new StandardQueueProvider(getFlowManager()));
}
public void initializeFlow(final QueueProvider queueProvider) throws IOException {
writeLock.lock();
try {
// get all connections/queues and recover from swap files.
final Set<Connection> connections = flowManager.findAllConnections();
flowFileRepository.loadFlowFiles(queueProvider);
long maxIdFromSwapFiles = -1L;
if (flowFileRepository.isVolatile()) {
for (final Connection connection : connections) {
final FlowFileQueue queue = connection.getFlowFileQueue();
queue.purgeSwapFiles();
}
} else {
for (final Connection connection : connections) {
final FlowFileQueue queue = connection.getFlowFileQueue();
final SwapSummary swapSummary = queue.recoverSwappedFlowFiles();
if (swapSummary != null) {
final Long maxFlowFileId = swapSummary.getMaxFlowFileId();
if (maxFlowFileId != null && maxFlowFileId > maxIdFromSwapFiles) {
maxIdFromSwapFiles = maxFlowFileId;
}
for (final ResourceClaim resourceClaim : swapSummary.getResourceClaims()) {
resourceClaimManager.incrementClaimantCount(resourceClaim);
}
}
}
}
flowFileRepository.updateMaxFlowFileIdentifier(maxIdFromSwapFiles + 1);
// Begin expiring FlowFiles that are old
final RepositoryContextFactory contextFactory = new RepositoryContextFactory(contentRepository, flowFileRepository,
flowFileEventRepository, counterRepositoryRef.get(), provenanceRepository, stateManagerProvider);
processScheduler.scheduleFrameworkTask(new ExpireFlowFiles(this, contextFactory), "Expire FlowFiles", 30L, 30L, TimeUnit.SECONDS);
// now that we've loaded the FlowFiles, this has restored our ContentClaims' states, so we can tell the
// ContentRepository to purge superfluous files
contentRepository.cleanup();
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.start();
}
if (loadBalanceServer != null) {
loadBalanceServer.start();
}
notifyComponentsConfigurationRestored();
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
try {
updateRemoteProcessGroups();
} catch (final Throwable t) {
LOG.warn("Unable to update Remote Process Groups due to " + t);
if (LOG.isDebugEnabled()) {
LOG.warn("", t);
}
}
}
}, 0L, 30L, TimeUnit.SECONDS);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
final ProcessGroup rootGroup = flowManager.getRootGroup();
final List<ProcessGroup> allGroups = rootGroup.findAllProcessGroups();
allGroups.add(rootGroup);
for (final ProcessGroup group : allGroups) {
try {
group.synchronizeWithFlowRegistry(flowRegistryClient);
} catch (final Exception e) {
LOG.error("Failed to synchronize {} with Flow Registry", group, e);
}
}
}
}, 5, 60, TimeUnit.SECONDS);
initialized.set(true);
} finally {
writeLock.unlock("initializeFlow");
}
}
private void notifyComponentsConfigurationRestored() {
for (final ProcessorNode procNode : flowManager.getRootGroup().findAllProcessors()) {
final Processor processor = procNode.getProcessor();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, processor.getClass(), processor.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, processor);
}
}
for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
final ControllerService service = serviceNode.getControllerServiceImplementation();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, service.getClass(), service.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, service);
}
}
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
final ReportingTask task = taskNode.getReportingTask();
try (final NarCloseable nc = NarCloseable.withComponentNarLoader(extensionManager, task.getClass(), task.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigurationRestored.class, task);
}
}
}
/**
* <p>
* Causes any processors that were added to the flow with a 'delayStart'
* flag of true to now start
* </p>
*
* @param startDelayedComponents true if start
*/
public void onFlowInitialized(final boolean startDelayedComponents) {
writeLock.lock();
try {
// Perform validation of all components before attempting to start them.
LOG.debug("Triggering initial validation of all components");
final long start = System.nanoTime();
final ValidationTrigger triggerIfValidating = new ValidationTrigger() {
@Override
public void triggerAsync(final ComponentNode component) {
final ValidationStatus status = component.getValidationStatus();
if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
LOG.debug("Will trigger async validation for {} because its status is VALIDATING", component);
validationTrigger.triggerAsync(component);
} else {
LOG.debug("Will not trigger async validation for {} because its status is {}", component, status);
}
}
@Override
public void trigger(final ComponentNode component) {
final ValidationStatus status = component.getValidationStatus();
if (component.getValidationStatus() == ValidationStatus.VALIDATING) {
LOG.debug("Will trigger immediate validation for {} because its status is VALIDATING", component);
validationTrigger.trigger(component);
} else {
LOG.debug("Will not trigger immediate validation for {} because its status is {}", component, status);
}
}
};
new TriggerValidationTask(flowManager, triggerIfValidating).run();
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
LOG.info("Performed initial validation of all components in {} milliseconds", millis);
// Trigger component validation to occur every 5 seconds.
validationThreadPool.scheduleWithFixedDelay(new TriggerValidationTask(flowManager, validationTrigger), 5, 5, TimeUnit.SECONDS);
if (startDelayedComponents) {
LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
}
try {
if (connectable instanceof ProcessorNode) {
connectable.getProcessGroup().startProcessor((ProcessorNode) connectable, true);
} else {
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
}
}
startConnectablesAfterInitialization.clear();
int startedTransmitting = 0;
for (final RemoteGroupPort remoteGroupPort : startRemoteGroupPortsAfterInitialization) {
try {
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
startedTransmitting++;
} catch (final Throwable t) {
LOG.error("Unable to start transmitting with {} due to {}", new Object[]{remoteGroupPort, t});
}
}
LOG.info("Started {} Remote Group Ports transmitting", startedTransmitting);
startRemoteGroupPortsAfterInitialization.clear();
} else {
// We don't want to start all of the delayed components. However, funnels need to be started anyway
// because we don't provide users the ability to start or stop them - they are just notional.
for (final Connectable connectable : startConnectablesAfterInitialization) {
try {
if (connectable instanceof Funnel) {
startConnectable(connectable);
}
} catch (final Throwable t) {
LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
}
}
startConnectablesAfterInitialization.clear();
startRemoteGroupPortsAfterInitialization.clear();
}
for (final Connection connection : flowManager.findAllConnections()) {
connection.getFlowFileQueue().startLoadBalancing();
}
scheduleLongRunningTaskMonitor();
} finally {
writeLock.unlock("onFlowInitialized");
}
}
private void scheduleLongRunningTaskMonitor() {
longRunningTaskMonitorThreadPool.ifPresent(flowEngine -> {
try {
final long scheduleMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE);
final long thresholdMillis = parseDurationPropertyToMillis(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD);
LongRunningTaskMonitor longRunningTaskMonitor = new LongRunningTaskMonitor(getFlowManager(), createEventReporter(), thresholdMillis);
longRunningTaskMonitorThreadPool.get().scheduleWithFixedDelay(longRunningTaskMonitor, scheduleMillis, scheduleMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
LOG.warn("Could not initialize LongRunningTaskMonitor.", e);
}
});
}
private long parseDurationPropertyToMillis(String propertyName) {
try {
final String duration = nifiProperties.getProperty(propertyName);
return (long) FormatUtils.getPreciseTimeDuration(duration, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
LOG.warn("Could not retrieve value for {}. Valid values e.g. 60 secs or 1 min.", propertyName);
throw e;
}
}
private boolean isLongRunningTaskMonitorEnabled() {
return StringUtils.isNotBlank(nifiProperties.getProperty(NiFiProperties.MONITOR_LONG_RUNNING_TASK_SCHEDULE))
&& StringUtils.isNotBlank(nifiProperties.getProperty(NiFiProperties.MONITOR_LONG_RUNNING_TASK_THRESHOLD));
}
public boolean isStartAfterInitialization(final Connectable component) {
return startConnectablesAfterInitialization.contains(component) || startRemoteGroupPortsAfterInitialization.contains(component);
}
private ContentRepository createContentRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
final String implementationClassName = properties.getProperty(NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION, DEFAULT_CONTENT_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Content Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.CONTENT_REPOSITORY_IMPLEMENTATION);
}
try {
final ContentRepository contentRepo = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ContentRepository.class, properties);
synchronized (contentRepo) {
contentRepo.initialize(resourceClaimManager);
}
return contentRepo;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private ProvenanceRepository createProvenanceRepository(final NiFiProperties properties) throws InstantiationException, IllegalAccessException, ClassNotFoundException {
final String implementationClassName = properties.getProperty(NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS, DEFAULT_PROVENANCE_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Provenance Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.PROVENANCE_REPO_IMPLEMENTATION_CLASS);
}
try {
return NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, ProvenanceRepository.class, properties);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
private StatusHistoryRepository createStatusHistoryRepository() {
final String implementationClassName = nifiProperties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Status History Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
}
try {
final StatusHistoryRepository repository = NarThreadContextClassLoader.createInstance(extensionManager, implementationClassName, StatusHistoryRepository.class, nifiProperties);
repository.start();
return repository;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public KerberosConfig createKerberosConfig(final NiFiProperties nifiProperties) {
final String principal = nifiProperties.getKerberosServicePrincipal();
final String keytabLocation = nifiProperties.getKerberosServiceKeytabLocation();
final File kerberosConfigFile = nifiProperties.getKerberosConfigurationFile();
if (principal == null && keytabLocation == null && kerberosConfigFile == null) {
return KerberosConfig.NOT_CONFIGURED;
}
final File keytabFile = keytabLocation == null ? null : new File(keytabLocation);
return new KerberosConfig(principal, keytabFile, kerberosConfigFile);
}
public ValidationTrigger getValidationTrigger() {
return validationTrigger;
}
public PropertyEncryptor getEncryptor() {
return encryptor;
}
public SensitiveValueEncoder getSensitiveValueEncoder() {
return sensitiveValueEncoder;
}
/**
* @return the ExtensionManager used for instantiating Processors,
* Prioritizers, etc.
*/
public ExtensionManager getExtensionManager() {
return extensionManager;
}
public String getInstanceId() {
readLock.lock();
try {
return instanceId;
} finally {
readLock.unlock("getInstanceId");
}
}
public Heartbeater getHeartbeater() {
return heartbeater;
}
/**
* @return the BulletinRepository for storing and retrieving Bulletins
*/
public BulletinRepository getBulletinRepository() {
return bulletinRepository;
}
public SnippetManager getSnippetManager() {
return snippetManager;
}
public StateManagerProvider getStateManagerProvider() {
return stateManagerProvider;
}
public Authorizer getAuthorizer() {
return authorizer;
}
/**
* @return <code>true</code> if the scheduling engine for this controller
* has been terminated.
*/
public boolean isTerminated() {
this.readLock.lock();
try {
return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
} finally {
this.readLock.unlock("isTerminated");
}
}
/**
* Triggers the controller to begin shutdown, stopping all processors and
* terminating the scheduling engine. After calling this method, the
* {@link #isTerminated()} method will indicate whether or not the shutdown
* has finished.
*
* @param kill if <code>true</code>, attempts to stop all active threads,
* but makes no guarantee that this will happen
* @throws IllegalStateException if the controller is already stopped or
* currently in the processor of stopping
*/
public void shutdown(final boolean kill) {
this.shutdown = true;
flowManager.getRootGroup().stopProcessing();
readLock.lock();
try {
if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) {
throw new IllegalStateException("Controller already stopped or still stopping...");
}
if (leaderElectionManager != null) {
leaderElectionManager.stop();
}
if (heartbeatMonitor != null) {
heartbeatMonitor.stop();
}
if (kill) {
this.timerDrivenEngineRef.get().shutdownNow();
this.eventDrivenEngineRef.get().shutdownNow();
LOG.info("Initiated immediate shutdown of flow controller...");
} else {
this.timerDrivenEngineRef.get().shutdown();
this.eventDrivenEngineRef.get().shutdown();
LOG.info("Initiated graceful shutdown of flow controller...waiting up to " + gracefulShutdownSeconds + " seconds");
}
validationThreadPool.shutdown();
clusterTaskExecutor.shutdownNow();
if (zooKeeperStateServer != null) {
zooKeeperStateServer.shutdown();
}
if (loadBalanceClientThreadPool != null) {
loadBalanceClientThreadPool.shutdownNow();
}
loadBalanceClientTasks.forEach(NioAsyncLoadBalanceClientTask::stop);
// Trigger any processors' methods marked with @OnShutdown to be called
flowManager.getRootGroup().shutdown();
stateManagerProvider.shutdown();
// invoke any methods annotated with @OnShutdown on Controller Services
for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
processScheduler.shutdownControllerService(serviceNode, controllerServiceProvider);
}
// invoke any methods annotated with @OnShutdown on Reporting Tasks
for (final ReportingTaskNode taskNode : getAllReportingTasks()) {
processScheduler.shutdownReportingTask(taskNode);
}
try {
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
} catch (final InterruptedException ie) {
LOG.info("Interrupted while waiting for controller termination.");
}
try {
flowFileRepository.close();
} catch (final Throwable t) {
LOG.warn("Unable to shut down FlowFileRepository due to {}", new Object[]{t});
}
if (this.timerDrivenEngineRef.get().isTerminated() && eventDrivenEngineRef.get().isTerminated()) {
LOG.info("Controller has been terminated successfully.");
} else {
LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that "
+ "will take an indeterminate amount of time to stop. Might need to kill the program manually.");
}
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.stop();
listener.destroy();
}
if (loadBalanceServer != null) {
loadBalanceServer.stop();
}
if (loadBalanceClientRegistry != null) {
loadBalanceClientRegistry.stop();
}
if (processScheduler != null) {
processScheduler.shutdown();
}
if (contentRepository != null) {
contentRepository.shutdown();
}
if (provenanceRepository != null) {
try {
provenanceRepository.close();
} catch (final IOException ioe) {
LOG.warn("There was a problem shutting down the Provenance Repository: " + ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", ioe);
}
}
}
if (statusHistoryRepository != null) {
statusHistoryRepository.shutdown();
}
} finally {
readLock.unlock("shutdown");
}
}
/**
* Serializes the current state of the controller to the given OutputStream
*
* @param serializer serializer
* @param os stream
* @throws FlowSerializationException if serialization of the flow fails for
* any reason
*/
public synchronized <T> void serialize(final FlowSerializer<T> serializer, final OutputStream os) throws FlowSerializationException {
T flowConfiguration;
readLock.lock();
try {
final ScheduledStateLookup scheduledStateLookup = new ScheduledStateLookup() {
@Override
public ScheduledState getScheduledState(final ProcessorNode procNode) {
if (startConnectablesAfterInitialization.contains(procNode)) {
return ScheduledState.RUNNING;
}
return procNode.getDesiredState();
}
@Override
public ScheduledState getScheduledState(final Port port) {
if (startConnectablesAfterInitialization.contains(port)) {
return ScheduledState.RUNNING;
}
if (startRemoteGroupPortsAfterInitialization.contains(port)) {
return ScheduledState.RUNNING;
}
return port.getScheduledState();
}
};
flowConfiguration = serializer.transform(this, scheduledStateLookup);
} finally {
readLock.unlock("serialize");
}
serializer.serialize(flowConfiguration, os);
}
/**
* Synchronizes this controller with the proposed flow.
* <p>
* For more details, see
* {@link FlowSynchronizer#sync(FlowController, DataFlow, PropertyEncryptor, FlowService)}.
*
* @param synchronizer synchronizer
* @param dataFlow the flow to load the controller with. If the flow is null
* or zero length, then the controller must not have a flow or else an
* UninheritableFlowException will be thrown.
* @param flowService the flow service
*
* @throws FlowSerializationException if proposed flow is not a valid flow
* configuration file
* @throws UninheritableFlowException if the proposed flow cannot be loaded
* by the controller because in doing so would risk orphaning flow files
* @throws FlowSynchronizationException if updates to the controller failed.
* If this exception is thrown, then the controller should be considered
* unsafe to be used
* @throws MissingBundleException if the proposed flow cannot be loaded by the
* controller because it contains a bundle that does not exist in the controller
*/
public void synchronize(final FlowSynchronizer synchronizer, final DataFlow dataFlow, final FlowService flowService)
throws FlowSerializationException, FlowSynchronizationException, UninheritableFlowException, MissingBundleException {
writeLock.lock();
try {
LOG.debug("Synchronizing controller with proposed flow");
try {
synchronizer.sync(this, dataFlow, encryptor, flowService);
} catch (final UninheritableFlowException ufe) {
final NodeIdentifier localNodeId = getNodeId();
if (localNodeId != null) {
try {
clusterCoordinator.requestNodeDisconnect(localNodeId, DisconnectionCode.MISMATCHED_FLOWS, ufe.getMessage());
} catch (final Exception e) {
LOG.error("Failed to synchronize Controller with proposed flow and also failed to notify cluster that the flows do not match. Node's state may remain CONNECTING instead of " +
"transitioning to DISCONNECTED.", e);
}
}
throw ufe;
}
flowSynchronized.set(true);
LOG.info("Successfully synchronized controller with proposed flow. Flow contains the following number of components: {}", flowManager.getComponentCounts());
} finally {
writeLock.unlock("synchronize");
}
}
/**
* @return the currently configured maximum number of threads that can be
* used for executing processors at any given time.
*/
public int getMaxTimerDrivenThreadCount() {
return maxTimerDrivenThreads.get();
}
public int getMaxEventDrivenThreadCount() {
return maxEventDrivenThreads.get();
}
public int getActiveEventDrivenThreadCount() {
return eventDrivenEngineRef.get().getActiveCount();
}
public int getActiveTimerDrivenThreadCount() {
return timerDrivenEngineRef.get().getActiveCount();
}
public void setMaxTimerDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
setMaxThreadCount(maxThreadCount, this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads);
} finally {
writeLock.unlock("setMaxTimerDrivenThreadCount");
}
}
public void setMaxEventDrivenThreadCount(final int maxThreadCount) {
writeLock.lock();
try {
setMaxThreadCount(maxThreadCount, this.eventDrivenEngineRef.get(), this.maxEventDrivenThreads);
processScheduler.setMaxThreadCount(SchedulingStrategy.EVENT_DRIVEN, maxThreadCount);
} finally {
writeLock.unlock("setMaxEventDrivenThreadCount");
}
}
/**
* Updates the number of threads that can be simultaneously used for executing processors.
* This method must be called while holding the write lock!
*
* @param maxThreadCount max number of threads
*/
private void setMaxThreadCount(final int maxThreadCount, final FlowEngine engine, final AtomicInteger maxThreads) {
if (maxThreadCount < 1) {
throw new IllegalArgumentException("Cannot set max number of threads to less than 2");
}
maxThreads.getAndSet(maxThreadCount);
if (null != engine && engine.getCorePoolSize() < maxThreadCount) {
engine.setCorePoolSize(maxThreads.intValue());
}
}
public UserAwareEventAccess getEventAccess() {
return eventAccess;
}
public StatusAnalyticsEngine getStatusAnalyticsEngine() {
return analyticsEngine;
}
/**
* Sets the root group to the given group
*
* @param group the ProcessGroup that is to become the new Root Group
* @throws IllegalArgumentException if the ProcessGroup has a parent
* @throws IllegalStateException if the FlowController does not know about
* the given process group
*/
void setRootGroup(final ProcessGroup group) {
if (requireNonNull(group).getParent() != null) {
throw new IllegalArgumentException("A ProcessGroup that has a parent cannot be the Root Group");
}
writeLock.lock();
try {
flowManager.setRootGroup(group);
for (final RemoteSiteListener listener : externalSiteListeners) {
listener.setRootGroup(group);
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(group, isPrimary()));
allProcessGroups.put(group.getIdentifier(), group);
allProcessGroups.put(FlowManager.ROOT_GROUP_ID_ALIAS, group);
} finally {
writeLock.unlock("setRootGroup");
}
}
public SystemDiagnostics getSystemDiagnostics() {
final SystemDiagnosticsFactory factory = new SystemDiagnosticsFactory();
return factory.create(flowFileRepository, contentRepository, provenanceRepository);
}
public String getContentRepoFileStoreName(final String containerName) {
return contentRepository.getContainerFileStoreName(containerName);
}
public String getFlowRepoFileStoreName() {
return flowFileRepository.getFileStoreName();
}
public String getProvenanceRepoFileStoreName(final String containerName) {
return provenanceRepository.getContainerFileStoreName(containerName);
}
//
// ProcessGroup access
//
private Position toPosition(final PositionDTO dto) {
return new Position(dto.getX(), dto.getY());
}
//
// Snippet
//
private void verifyBundleInVersionedFlow(final org.apache.nifi.registry.flow.Bundle requiredBundle, final Set<BundleCoordinate> supportedBundles) {
final BundleCoordinate requiredCoordinate = new BundleCoordinate(requiredBundle.getGroup(), requiredBundle.getArtifact(), requiredBundle.getVersion());
if (!supportedBundles.contains(requiredCoordinate)) {
throw new IllegalStateException("Unsupported bundle: " + requiredCoordinate);
}
}
private void verifyProcessorsInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (versionedFlow.getProcessors() != null) {
versionedFlow.getProcessors().forEach(processor -> {
if (processor.getBundle() == null) {
throw new IllegalArgumentException("Processor bundle must be specified.");
}
if (supportedTypes.containsKey(processor.getType())) {
verifyBundleInVersionedFlow(processor.getBundle(), supportedTypes.get(processor.getType()));
} else {
throw new IllegalStateException("Invalid Processor Type: " + processor.getType());
}
});
}
if (versionedFlow.getProcessGroups() != null) {
versionedFlow.getProcessGroups().forEach(processGroup -> {
verifyProcessorsInVersionedFlow(processGroup, supportedTypes);
});
}
}
private void verifyControllerServicesInVersionedFlow(final VersionedProcessGroup versionedFlow, final Map<String, Set<BundleCoordinate>> supportedTypes) {
if (versionedFlow.getControllerServices() != null) {
versionedFlow.getControllerServices().forEach(controllerService -> {
if (supportedTypes.containsKey(controllerService.getType())) {
if (controllerService.getBundle() == null) {
throw new IllegalArgumentException("Controller Service bundle must be specified.");
}
verifyBundleInVersionedFlow(controllerService.getBundle(), supportedTypes.get(controllerService.getType()));
} else {
throw new IllegalStateException("Invalid Controller Service Type: " + controllerService.getType());
}
});
}
if (versionedFlow.getProcessGroups() != null) {
versionedFlow.getProcessGroups().forEach(processGroup -> {
verifyControllerServicesInVersionedFlow(processGroup, supportedTypes);
});
}
}
public void verifyComponentTypesInSnippet(final VersionedProcessGroup versionedFlow) {
final Map<String, Set<BundleCoordinate>> processorClasses = new HashMap<>();
for (final ExtensionDefinition extensionDefinition : extensionManager.getExtensions(Processor.class)) {
final String name = extensionDefinition.getImplementationClassName();
processorClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyProcessorsInVersionedFlow(versionedFlow, processorClasses);
final Map<String, Set<BundleCoordinate>> controllerServiceClasses = new HashMap<>();
for (final ExtensionDefinition extensionDefinition : extensionManager.getExtensions(ControllerService.class)) {
final String name = extensionDefinition.getImplementationClassName();
controllerServiceClasses.put(name, extensionManager.getBundles(name).stream().map(bundle -> bundle.getBundleDetails().getCoordinate()).collect(Collectors.toSet()));
}
verifyControllerServicesInVersionedFlow(versionedFlow, controllerServiceClasses);
final Set<String> prioritizerClasses = new HashSet<>();
for (final ExtensionDefinition extensionDefinition : extensionManager.getExtensions(FlowFilePrioritizer.class)) {
final String name = extensionDefinition.getImplementationClassName();
prioritizerClasses.add(name);
}
final Set<VersionedConnection> allConns = new HashSet<>();
allConns.addAll(versionedFlow.getConnections());
for (final VersionedProcessGroup childGroup : versionedFlow.getProcessGroups()) {
allConns.addAll(findAllConnections(childGroup));
}
for (final VersionedConnection conn : allConns) {
final List<String> prioritizers = conn.getPrioritizers();
if (prioritizers != null) {
for (final String prioritizer : prioritizers) {
if (!prioritizerClasses.contains(prioritizer)) {
throw new IllegalStateException("Invalid FlowFile Prioritizer Type: " + prioritizer);
}
}
}
}
}
private Set<VersionedConnection> findAllConnections(final VersionedProcessGroup group) {
final Set<VersionedConnection> conns = new HashSet<>();
for (final VersionedConnection connection : group.getConnections()) {
conns.add(connection);
}
for (final VersionedProcessGroup childGroup : group.getProcessGroups()) {
conns.addAll(findAllConnections(childGroup));
}
return conns;
}
//
// Processor access
//
/**
* Returns the ProcessGroup with the given ID
*
* @param id group
* @return the process group or null if not group is found
*/
private ProcessGroup lookupGroup(final String id) {
final ProcessGroup group = flowManager.getGroup(id);
if (group == null) {
throw new IllegalStateException("No Group with ID " + id + " exists");
}
return group;
}
public List<GarbageCollectionStatus> getGarbageCollectionStatus() {
final List<GarbageCollectionStatus> statuses = new ArrayList<>();
final Date now = new Date();
for (final GarbageCollectorMXBean mbean : ManagementFactory.getGarbageCollectorMXBeans()) {
final String managerName = mbean.getName();
final long count = mbean.getCollectionCount();
final long millis = mbean.getCollectionTime();
final GarbageCollectionStatus status = new StandardGarbageCollectionStatus(managerName, now, count, millis);
statuses.add(status);
}
return statuses;
}
public GarbageCollectionHistory getGarbageCollectionHistory() {
return statusHistoryRepository.getGarbageCollectionHistory(new Date(0L), new Date());
}
public ReloadComponent getReloadComponent() {
return reloadComponent;
}
public void startProcessor(final String parentGroupId, final String processorId) {
startProcessor(parentGroupId, processorId, true);
}
public void startProcessor(final String parentGroupId, final String processorId, final boolean failIfStopping) {
final ProcessGroup group = lookupGroup(parentGroupId);
final ProcessorNode node = group.getProcessor(processorId);
if (node == null) {
throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
}
writeLock.lock();
try {
if (initialized.get()) {
group.startProcessor(node, failIfStopping);
} else {
startConnectablesAfterInitialization.add(node);
}
} finally {
writeLock.unlock("startProcessor");
}
}
public boolean isInitialized() {
return initialized.get();
}
public boolean isFlowSynchronized() {
return flowSynchronized.get();
}
public void startConnectable(final Connectable connectable) {
final ProcessGroup group = requireNonNull(connectable).getProcessGroup();
writeLock.lock();
try {
if (initialized.get()) {
switch (requireNonNull(connectable).getConnectableType()) {
case FUNNEL:
group.startFunnel((Funnel) connectable);
break;
case INPUT_PORT:
case REMOTE_INPUT_PORT:
group.startInputPort((Port) connectable);
break;
case OUTPUT_PORT:
case REMOTE_OUTPUT_PORT:
group.startOutputPort((Port) connectable);
break;
default:
throw new IllegalArgumentException();
}
} else {
startConnectablesAfterInitialization.add(connectable);
}
} finally {
writeLock.unlock("startConnectable");
}
}
public void stopConnectable(final Connectable connectable) {
final ProcessGroup group = requireNonNull(connectable).getProcessGroup();
writeLock.lock();
try {
switch (requireNonNull(connectable).getConnectableType()) {
case FUNNEL:
// Ignore. We don't support stopping funnels.
break;
case INPUT_PORT:
case REMOTE_INPUT_PORT:
startConnectablesAfterInitialization.remove(connectable);
group.stopInputPort((Port) connectable);
break;
case OUTPUT_PORT:
case REMOTE_OUTPUT_PORT:
startConnectablesAfterInitialization.remove(connectable);
group.stopOutputPort((Port) connectable);
break;
default:
throw new IllegalArgumentException();
}
} finally {
writeLock.unlock("stopConnectable");
}
}
public void startTransmitting(final RemoteGroupPort remoteGroupPort) {
writeLock.lock();
try {
if (initialized.get()) {
remoteGroupPort.getRemoteProcessGroup().startTransmitting(remoteGroupPort);
} else {
startRemoteGroupPortsAfterInitialization.add(remoteGroupPort);
}
} finally {
writeLock.unlock("startTransmitting");
}
}
public void stopTransmitting(final RemoteGroupPort remoteGroupPort) {
writeLock.lock();
try {
if (initialized.get()) {
remoteGroupPort.getRemoteProcessGroup().stopTransmitting(remoteGroupPort);
} else {
startRemoteGroupPortsAfterInitialization.remove(remoteGroupPort);
}
} finally {
writeLock.unlock("stopTransmitting");
}
}
public void stopProcessor(final String parentGroupId, final String processorId) {
final ProcessGroup group = lookupGroup(parentGroupId);
final ProcessorNode node = group.getProcessor(processorId);
if (node == null) {
throw new IllegalStateException("Cannot find ProcessorNode with ID " + processorId + " within ProcessGroup with ID " + parentGroupId);
}
group.stopProcessor(node);
// If we are ready to start the processor upon initialization of the controller, don't.
startConnectablesAfterInitialization.remove(node);
}
@Override
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
if (isTerminated()) {
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode.getIdentifier() + " because the controller is terminated");
}
reportingTaskNode.verifyCanStart();
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
processScheduler.schedule(reportingTaskNode);
}
@Override
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
if (isTerminated()) {
return;
}
reportingTaskNode.verifyCanStop();
processScheduler.unschedule(reportingTaskNode);
}
public FlowManager getFlowManager() {
return flowManager;
}
public GarbageCollectionLog getGarbageCollectionLog() {
return gcLog;
}
public RepositoryContextFactory getRepositoryContextFactory() {
return repositoryContextFactory;
}
public ClusterCoordinator getClusterCoordinator() {
return clusterCoordinator;
}
/**
* Creates a connection between two Connectable objects.
*
* @param id required ID of the connection
* @param name the name of the connection, or <code>null</code> to leave the
* connection unnamed
* @param source required source
* @param destination required destination
* @param relationshipNames required collection of relationship names
* @return the connection
* @throws NullPointerException if the ID, source, destination, or set of relationships is null.
* @throws IllegalArgumentException if <code>relationships</code> is an empty collection
*/
public Connection createConnection(final String id, final String name, final Connectable source, final Connectable destination, final Collection<String> relationshipNames) {
final StandardConnection.Builder builder = new StandardConnection.Builder(processScheduler);
final List<Relationship> relationships = new ArrayList<>();
for (final String relationshipName : requireNonNull(relationshipNames)) {
relationships.add(new Relationship.Builder().name(relationshipName).build());
}
// Create and initialize a FlowFileSwapManager for this connection
final FlowFileSwapManager swapManager = createSwapManager();
final EventReporter eventReporter = createEventReporter();
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
final SwapManagerInitializationContext initializationContext = new SwapManagerInitializationContext() {
@Override
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public FlowFileRepository getFlowFileRepository() {
return flowFileRepository;
}
@Override
public EventReporter getEventReporter() {
return eventReporter;
}
};
swapManager.initialize(initializationContext);
}
final FlowFileQueueFactory flowFileQueueFactory = new FlowFileQueueFactory() {
@Override
public FlowFileQueue createFlowFileQueue(final LoadBalanceStrategy loadBalanceStrategy, final String partitioningAttribute, final ConnectionEventListener eventListener,
final ProcessGroup processGroup) {
final FlowFileQueue flowFileQueue;
if (clusterCoordinator == null) {
flowFileQueue = new StandardFlowFileQueue(id, eventListener, flowFileRepository, provenanceRepository, resourceClaimManager, processScheduler, swapManager,
eventReporter, nifiProperties.getQueueSwapThreshold(),
processGroup.getDefaultFlowFileExpiration(), processGroup.getDefaultBackPressureObjectThreshold(), processGroup.getDefaultBackPressureDataSizeThreshold());
} else {
flowFileQueue = new SocketLoadBalancedFlowFileQueue(id, eventListener, processScheduler, flowFileRepository, provenanceRepository, contentRepository, resourceClaimManager,
clusterCoordinator, loadBalanceClientRegistry, swapManager, nifiProperties.getQueueSwapThreshold(), eventReporter);
flowFileQueue.setFlowFileExpiration(processGroup.getDefaultFlowFileExpiration());
flowFileQueue.setBackPressureObjectThreshold(processGroup.getDefaultBackPressureObjectThreshold());
flowFileQueue.setBackPressureDataSizeThreshold(processGroup.getDefaultBackPressureDataSizeThreshold());
}
return flowFileQueue;
}
};
final Connection connection = builder.id(requireNonNull(id).intern())
.name(name == null ? null : name.intern())
.processGroup(destination.getProcessGroup())
.relationships(relationships)
.source(requireNonNull(source))
.destination(destination)
.flowFileQueueFactory(flowFileQueueFactory)
.build();
return connection;
}
@Override
public ReportingTaskNode getReportingTaskNode(final String identifier) {
return flowManager.getReportingTaskNode(identifier);
}
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final BundleCoordinate bundleCoordinate, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
return flowManager.createReportingTask(type, id, bundleCoordinate, firstTimeAdded);
}
@Override
public Set<ReportingTaskNode> getAllReportingTasks() {
return flowManager.getAllReportingTasks();
}
@Override
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
flowManager.removeReportingTask(reportingTaskNode);
}
public FlowRegistryClient getFlowRegistryClient() {
return flowRegistryClient;
}
public ControllerServiceProvider getControllerServiceProvider() {
return controllerServiceProvider;
}
public VariableRegistry getVariableRegistry() {
return variableRegistry;
}
public ProvenanceAuthorizableFactory getProvenanceAuthorizableFactory() {
return provenanceAuthorizableFactory;
}
@Override
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanEnable();
reportingTaskNode.reloadAdditionalResourcesIfNecessary();
processScheduler.enableReportingTask(reportingTaskNode);
}
@Override
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanDisable();
processScheduler.disableReportingTask(reportingTaskNode);
}
//
// Counters
//
public List<Counter> getCounters() {
final List<Counter> counters = new ArrayList<>();
final CounterRepository counterRepo = counterRepositoryRef.get();
for (final Counter counter : counterRepo.getCounters()) {
counters.add(counter);
}
return counters;
}
public Counter resetCounter(final String identifier) {
final CounterRepository counterRepo = counterRepositoryRef.get();
final Counter resetValue = counterRepo.resetCounter(identifier);
return resetValue;
}
//
// Access to controller status
//
public QueueSize getTotalFlowFileCount(final ProcessGroup group) {
int count = 0;
long contentSize = 0L;
for (final Connection connection : group.getConnections()) {
final QueueSize size = connection.getFlowFileQueue().size();
count += size.getObjectCount();
contentSize += size.getByteCount();
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
final QueueSize size = getTotalFlowFileCount(childGroup);
count += size.getObjectCount();
contentSize += size.getByteCount();
}
return new QueueSize(count, contentSize);
}
public class GroupStatusCounts {
private int queuedCount = 0;
private long queuedContentSize = 0;
private int activeThreadCount = 0;
private int terminatedThreadCount = 0;
public GroupStatusCounts(final ProcessGroup group) {
calculateCounts(group);
}
private void calculateCounts(final ProcessGroup group) {
for (final Connection connection : group.getConnections()) {
final QueueSize size = connection.getFlowFileQueue().size();
queuedCount += size.getObjectCount();
queuedContentSize += size.getByteCount();
final Connectable source = connection.getSource();
if (ConnectableType.REMOTE_OUTPUT_PORT.equals(source.getConnectableType())) {
final RemoteGroupPort remoteOutputPort = (RemoteGroupPort) source;
activeThreadCount += processScheduler.getActiveThreadCount(remoteOutputPort);
}
final Connectable destination = connection.getDestination();
if (ConnectableType.REMOTE_INPUT_PORT.equals(destination.getConnectableType())) {
final RemoteGroupPort remoteInputPort = (RemoteGroupPort) destination;
activeThreadCount += processScheduler.getActiveThreadCount(remoteInputPort);
}
}
for (final ProcessorNode processor : group.getProcessors()) {
activeThreadCount += processScheduler.getActiveThreadCount(processor);
terminatedThreadCount += processor.getTerminatedThreadCount();
}
for (final Port port : group.getInputPorts()) {
activeThreadCount += processScheduler.getActiveThreadCount(port);
}
for (final Port port : group.getOutputPorts()) {
activeThreadCount += processScheduler.getActiveThreadCount(port);
}
for (final Funnel funnel : group.getFunnels()) {
activeThreadCount += processScheduler.getActiveThreadCount(funnel);
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
calculateCounts(childGroup);
}
}
public int getQueuedCount() {
return queuedCount;
}
public long getQueuedContentSize() {
return queuedContentSize;
}
public int getActiveThreadCount() {
return activeThreadCount;
}
public int getTerminatedThreadCount() {
return terminatedThreadCount;
}
}
public GroupStatusCounts getGroupStatusCounts(final ProcessGroup group) {
return new GroupStatusCounts(group);
}
public int getActiveThreadCount() {
final int timerDrivenCount = timerDrivenEngineRef.get().getActiveCount();
final int eventDrivenCount = eventDrivenSchedulingAgent.getActiveThreadCount();
return timerDrivenCount + eventDrivenCount;
}
//
// Clustering methods
//
/**
* Starts heartbeating to the cluster. May only be called if the instance
* was constructed for a clustered environment.
*
* @throws IllegalStateException if not configured for clustering
*/
public void startHeartbeating() throws IllegalStateException {
if (!isConfiguredForClustering()) {
throw new IllegalStateException("Unable to start heartbeating because heartbeating is not configured.");
}
writeLock.lock();
try {
stopHeartbeating();
final HeartbeatSendTask sendTask = new HeartbeatSendTask();
this.heartbeatSendTask.set(sendTask);
heartbeatSenderFuture = clusterTaskExecutor.scheduleWithFixedDelay(sendTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
} finally {
writeLock.unlock("startHeartbeating");
}
}
/**
* Notifies controller that the sending of heartbeats should be temporarily
* suspended. This method does not cancel any background tasks as does
* {@link #stopHeartbeating()} and does not require any lock on the
* FlowController. Background tasks will still generate heartbeat messages
* and any background task currently in the process of sending a Heartbeat
* to the cluster will continue.
*/
public void suspendHeartbeats() {
heartbeatsSuspended.set(true);
}
/**
* Notifies controller that the sending of heartbeats should be re-enabled.
* This method does not submit any background tasks to take affect as does
* {@link #startHeartbeating()} and does not require any lock on the
* FlowController.
*/
public void resumeHeartbeats() {
heartbeatsSuspended.set(false);
}
/**
* Stops heartbeating to the cluster. May only be called if the instance was
* constructed for a clustered environment. If the controller was not
* heartbeating, then this method has no effect.
*
* @throws IllegalStateException if not clustered
*/
public void stopHeartbeating() throws IllegalStateException {
if (!isConfiguredForClustering()) {
throw new IllegalStateException("Unable to stop heartbeating because heartbeating is not configured.");
}
LOG.info("Will no longer send heartbeats");
writeLock.lock();
try {
if (!isHeartbeating()) {
return;
}
if (heartbeatSenderFuture != null) {
LOG.info("FlowController will stop sending heartbeats to Cluster Coordinator");
heartbeatSenderFuture.cancel(false);
}
} finally {
writeLock.unlock("stopHeartbeating");
}
}
/**
* @return true if the instance is heartbeating; false otherwise
*/
public boolean isHeartbeating() {
readLock.lock();
try {
return heartbeatSenderFuture != null && !heartbeatSenderFuture.isCancelled();
} finally {
readLock.unlock("isHeartbeating");
}
}
/**
* @return the number of seconds to wait between successive heartbeats
*/
public int getHeartbeatDelaySeconds() {
readLock.lock();
try {
return heartbeatDelaySeconds;
} finally {
readLock.unlock("getHeartbeatDelaySeconds");
}
}
/**
* The node identifier of this instance.
*
* @return the node identifier or null if no identifier is set
*/
public NodeIdentifier getNodeId() {
return nodeId;
}
/**
* Sets the node identifier for this instance.
*
* @param nodeId the node identifier, which may be null
*/
public void setNodeId(final NodeIdentifier nodeId) {
this.nodeId = nodeId;
}
/**
* @return true if this instance is clustered; false otherwise. Clustered
* means that a node is either connected or trying to connect to the
* cluster.
*/
@Override
public boolean isClustered() {
readLock.lock();
try {
return clustered;
} finally {
readLock.unlock("isClustered");
}
}
@Override
public Set<String> getClusterMembers() {
if (isClustered()) {
return clusterCoordinator.getConnectionStatuses().stream().map(s -> s.getNodeIdentifier().getApiAddress()).collect(Collectors.toSet());
} else {
return Collections.emptySet();
}
}
@Override
public Optional<String> getCurrentNode() {
if (isClustered() && getNodeId() != null) {
return Optional.of(getNodeId().getApiAddress());
} else {
return Optional.empty();
}
}
@Override
public boolean isConfiguredForClustering() {
return configuredForClustering;
}
void registerForClusterCoordinator(final boolean participate) {
final String participantId = participate ? heartbeatMonitor.getHeartbeatAddress() : null;
leaderElectionManager.register(ClusterRoles.CLUSTER_COORDINATOR, new LeaderElectionStateChangeListener() {
@Override
public synchronized void onLeaderRelinquish() {
LOG.info("This node is no longer the elected Active Cluster Coordinator");
bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster Coordinator", Severity.INFO.name(), participantId + " is no longer the Cluster Coordinator"));
// We do not want to stop the heartbeat monitor. This is because even though ZooKeeper offers guarantees
// that watchers will see changes on a ZNode in the order they happened, there does not seem to be any
// guarantee that Curator will notify us that our leadership was gained or loss in the order that it happened.
// As a result, if nodes connect/disconnect from cluster quickly, we could invoke stop() then start() or
// start() then stop() in the wrong order, which can cause the cluster to behavior improperly. As a result, we simply
// call start() when we become the leader, and this will ensure that initialization is handled. The heartbeat monitor
// then will check the zookeeper znode to check if it is the cluster coordinator before kicking any nodes out of the
// cluster.
}
@Override
public synchronized void onLeaderElection() {
LOG.info("This node elected Active Cluster Coordinator");
bulletinRepository.addBulletin(BulletinFactory.createBulletin("Cluster Coordinator", Severity.INFO.name(), participantId + " has been elected the Cluster Coordinator"));
// Purge any heartbeats that we already have. If we don't do this, we can have a scenario where we receive heartbeats
// from a node, and then another node becomes Cluster Coordinator. As a result, we stop receiving heartbeats. Now that
// we are again the Cluster Coordinator, we will detect that there are old heartbeat messages and start disconnecting
// nodes due to a lack of heartbeat. By purging the heartbeats here, we remove any old heartbeat messages so that this
// does not happen.
FlowController.this.heartbeatMonitor.purgeHeartbeats();
}
}, participantId);
}
void registerForPrimaryNode() {
final String participantId = heartbeatMonitor.getHeartbeatAddress();
leaderElectionManager.register(ClusterRoles.PRIMARY_NODE, new LeaderElectionStateChangeListener() {
@Override
public void onLeaderElection() {
setPrimary(true);
}
@Override
public void onLeaderRelinquish() {
setPrimary(false);
}
}, participantId);
}
/**
* Sets whether this instance is clustered. Clustered means that a node is
* either connected or trying to connect to the cluster.
*
* @param clustered true if clustered
* @param clusterInstanceId if clustered is true, indicates the InstanceID
* of the Cluster Manager
*/
public void setClustered(final boolean clustered, final String clusterInstanceId) {
writeLock.lock();
try {
// verify whether the this node's clustered status is changing
boolean isChanging = false;
if (this.clustered != clustered) {
isChanging = true;
if (clustered) {
LOG.info("Cluster State changed from Not Clustered to Clustered");
} else {
LOG.info("Cluster State changed from Clustered to Not Clustered");
}
}
// mark the new cluster status
this.clustered = clustered;
eventDrivenWorkerQueue.setClustered(clustered);
if (clusterInstanceId != null) {
this.instanceId = clusterInstanceId;
}
// update the bulletin repository
if (isChanging) {
if (clustered) {
onClusterConnect();
leaderElectionManager.start();
stateManagerProvider.enableClusterProvider();
loadBalanceClientRegistry.start();
heartbeat();
} else {
stateManagerProvider.disableClusterProvider();
setPrimary(false);
}
final List<RemoteProcessGroup> remoteGroups = flowManager.getRootGroup().findAllRemoteProcessGroups();
for (final RemoteProcessGroup remoteGroup : remoteGroups) {
remoteGroup.reinitialize(clustered);
}
}
if (!clustered) {
onClusterDisconnect();
}
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(flowManager.getRootGroup(), isPrimary()));
} finally {
writeLock.unlock("setClustered");
}
}
public void onClusterConnect() {
registerForPrimaryNode();
// Participate in Leader Election for Heartbeat Monitor. Start the heartbeat monitor
// if/when we become leader and stop it when we lose leader role
registerForClusterCoordinator(true);
resumeHeartbeats();
}
public void onClusterDisconnect() {
leaderElectionManager.unregister(ClusterRoles.PRIMARY_NODE);
leaderElectionManager.unregister(ClusterRoles.CLUSTER_COORDINATOR);
}
public LeaderElectionManager getLeaderElectionManager() {
return leaderElectionManager;
}
/**
* @return true if this instance is the primary node in the cluster; false
* otherwise
*/
@Override
public boolean isPrimary() {
return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.PRIMARY_NODE);
}
public boolean isClusterCoordinator() {
return isClustered() && leaderElectionManager != null && leaderElectionManager.isLeader(ClusterRoles.CLUSTER_COORDINATOR);
}
public void setPrimary(final boolean primary) {
final PrimaryNodeState nodeState = primary ? PrimaryNodeState.ELECTED_PRIMARY_NODE : PrimaryNodeState.PRIMARY_NODE_REVOKED;
final ProcessGroup rootGroup = flowManager.getRootGroup();
for (final ProcessorNode procNode : rootGroup.findAllProcessors()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, procNode.getProcessor().getClass(), procNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, procNode.getProcessor(), nodeState);
}
}
for (final ControllerServiceNode serviceNode : flowManager.getAllControllerServices()) {
final Class<?> serviceImplClass = serviceNode.getControllerServiceImplementation().getClass();
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, serviceImplClass, serviceNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, serviceNode.getControllerServiceImplementation(), nodeState);
}
}
for (final ReportingTaskNode reportingTaskNode : getAllReportingTasks()) {
try (final NarCloseable narCloseable = NarCloseable.withComponentNarLoader(extensionManager, reportingTaskNode.getReportingTask().getClass(), reportingTaskNode.getIdentifier())) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnPrimaryNodeStateChange.class, reportingTaskNode.getReportingTask(), nodeState);
}
}
// update primary
eventDrivenWorkerQueue.setPrimary(primary);
// update the heartbeat bean
final HeartbeatBean oldBean = this.heartbeatBeanRef.getAndSet(new HeartbeatBean(rootGroup, primary));
// Emit a bulletin detailing the fact that the primary node state has changed
if (oldBean == null || oldBean.isPrimary() != primary) {
final String message = primary ? "This node has been elected Primary Node" : "This node is no longer Primary Node";
final Bulletin bulletin = BulletinFactory.createBulletin("Primary Node", Severity.INFO.name(), message);
bulletinRepository.addBulletin(bulletin);
LOG.info(message);
}
}
static boolean areEqual(final String a, final String b) {
if (a == null && b == null) {
return true;
}
if (a == b) {
return true;
}
if (a == null || b == null) {
return false;
}
return a.equals(b);
}
static boolean areEqual(final Long a, final Long b) {
if (a == null && b == null) {
return true;
}
if (a == b) {
return true;
}
if (a == null || b == null) {
return false;
}
return a.compareTo(b) == 0;
}
public ContentAvailability getContentAvailability(final ProvenanceEventRecord event) {
final String replayFailure = getReplayFailureReason(event);
return new ContentAvailability() {
@Override
public String getReasonNotReplayable() {
return replayFailure;
}
@Override
public boolean isContentSame() {
return areEqual(event.getPreviousContentClaimContainer(), event.getContentClaimContainer())
&& areEqual(event.getPreviousContentClaimSection(), event.getContentClaimSection())
&& areEqual(event.getPreviousContentClaimIdentifier(), event.getContentClaimIdentifier())
&& areEqual(event.getPreviousContentClaimOffset(), event.getContentClaimOffset())
&& areEqual(event.getPreviousFileSize(), event.getFileSize());
}
@Override
public boolean isInputAvailable() {
try {
return contentRepository.isAccessible(createClaim(event.getPreviousContentClaimContainer(), event.getPreviousContentClaimSection(),
event.getPreviousContentClaimIdentifier(), event.getPreviousContentClaimOffset()));
} catch (final IOException e) {
return false;
}
}
@Override
public boolean isOutputAvailable() {
try {
return contentRepository.isAccessible(createClaim(event.getContentClaimContainer(), event.getContentClaimSection(),
event.getContentClaimIdentifier(), event.getContentClaimOffset()));
} catch (final IOException e) {
return false;
}
}
private ContentClaim createClaim(final String container, final String section, final String identifier, final Long offset) {
if (container == null || section == null || identifier == null) {
return null;
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false);
return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue());
}
@Override
public boolean isReplayable() {
return replayFailure == null;
}
};
}
public InputStream getContent(final ProvenanceEventRecord provEvent, final ContentDirection direction, final String requestor, final String requestUri) throws IOException {
requireNonNull(provEvent);
requireNonNull(direction);
requireNonNull(requestor);
requireNonNull(requestUri);
final ContentClaim claim;
final long size;
final long offset;
if (direction == ContentDirection.INPUT) {
if (provEvent.getPreviousContentClaimContainer() == null || provEvent.getPreviousContentClaimSection() == null || provEvent.getPreviousContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Input Content Claim not specified");
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(),
provEvent.getPreviousContentClaimIdentifier(), false, false);
claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset());
offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset();
size = provEvent.getPreviousFileSize();
} else {
if (provEvent.getContentClaimContainer() == null || provEvent.getContentClaimSection() == null || provEvent.getContentClaimIdentifier() == null) {
throw new IllegalArgumentException("Output Content Claim not specified");
}
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(),
provEvent.getContentClaimIdentifier(), false, false);
claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset());
offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset();
size = provEvent.getFileSize();
}
final InputStream rawStream = contentRepository.read(claim);
final ResourceClaim resourceClaim = claim.getResourceClaim();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord sendEvent = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.DOWNLOAD)
.setFlowFileUUID(provEvent.getFlowFileUuid())
.setAttributes(provEvent.getAttributes(), Collections.emptyMap())
.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(), offset, size)
.setTransitUri(requestUri)
.setEventTime(System.currentTimeMillis())
.setFlowFileEntryDate(provEvent.getFlowFileEntryDate())
.setLineageStartDate(provEvent.getLineageStartDate())
.setComponentType(flowManager.getRootGroup().getName())
.setComponentId(flowManager.getRootGroupId())
.setDetails("Download of " + (direction == ContentDirection.INPUT ? "Input" : "Output") + " Content requested by " + requestor + " for Provenance Event " + provEvent.getEventId())
.build();
provenanceRepository.registerEvent(sendEvent);
return new LimitedInputStream(rawStream, size);
}
public InputStream getContent(final FlowFileRecord flowFile, final String requestor, final String requestUri) throws IOException {
requireNonNull(flowFile);
requireNonNull(requestor);
requireNonNull(requestUri);
InputStream stream;
final ResourceClaim resourceClaim;
final ContentClaim contentClaim = flowFile.getContentClaim();
if (contentClaim == null) {
resourceClaim = null;
stream = new ByteArrayInputStream(new byte[0]);
} else {
resourceClaim = flowFile.getContentClaim().getResourceClaim();
stream = contentRepository.read(flowFile.getContentClaim());
final long contentClaimOffset = flowFile.getContentClaimOffset();
if (contentClaimOffset > 0L) {
StreamUtils.skip(stream, contentClaimOffset);
}
stream = new LimitingInputStream(stream, flowFile.getSize());
}
// Register a Provenance Event to indicate that we replayed the data.
final StandardProvenanceEventRecord.Builder sendEventBuilder = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.DOWNLOAD)
.setFlowFileUUID(flowFile.getAttribute(CoreAttributes.UUID.key()))
.setAttributes(flowFile.getAttributes(), Collections.emptyMap())
.setTransitUri(requestUri)
.setEventTime(System.currentTimeMillis())
.setFlowFileEntryDate(flowFile.getEntryDate())
.setLineageStartDate(flowFile.getLineageStartDate())
.setComponentType(flowManager.getRootGroup().getName())
.setComponentId(flowManager.getRootGroupId())
.setDetails("Download of Content requested by " + requestor + " for " + flowFile);
if (contentClaim != null) {
sendEventBuilder.setCurrentContentClaim(resourceClaim.getContainer(), resourceClaim.getSection(), resourceClaim.getId(),
contentClaim.getOffset() + flowFile.getContentClaimOffset(), flowFile.getSize());
}
final ProvenanceEventRecord sendEvent = sendEventBuilder.build();
provenanceRepository.registerEvent(sendEvent);
return stream;
}
private String getReplayFailureReason(final ProvenanceEventRecord event) {
// Check that the event is a valid type.
final ProvenanceEventType type = event.getEventType();
if (type == ProvenanceEventType.JOIN) {
return "Cannot replay events that are created from multiple parents";
}
// Make sure event has the Content Claim info
final Long contentSize = event.getPreviousFileSize();
final String contentClaimId = event.getPreviousContentClaimIdentifier();
final String contentClaimSection = event.getPreviousContentClaimSection();
final String contentClaimContainer = event.getPreviousContentClaimContainer();
if (contentSize == null || contentClaimId == null || contentClaimSection == null || contentClaimContainer == null) {
return "Cannot replay data from Provenance Event because the event does not contain the required Content Claim";
}
try {
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false);
final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset());
if (!contentRepository.isAccessible(contentClaim)) {
return "Content is no longer available in Content Repository";
}
} catch (final IOException ioe) {
return "Failed to determine whether or not content was available in Content Repository due to " + ioe.toString();
}
// Make sure that the source queue exists
if (event.getSourceQueueIdentifier() == null) {
return "Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue";
}
final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
queue = connection.getFlowFileQueue();
break;
}
}
if (queue == null) {
return "Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists";
}
return null;
}
public ProvenanceEventRecord replayFlowFile(final long provenanceEventRecordId, final NiFiUser user) throws IOException {
final ProvenanceEventRecord record = provenanceRepository.getEvent(provenanceEventRecordId, user);
if (record == null) {
throw new IllegalStateException("Cannot find Provenance Event with ID " + provenanceEventRecordId);
}
return replayFlowFile(record, user);
}
public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final NiFiUser user) throws IOException {
if (event == null) {
throw new NullPointerException();
}
// Check that the event is a valid type.
final ProvenanceEventType type = event.getEventType();
if (type == ProvenanceEventType.JOIN) {
throw new IllegalArgumentException("Cannot replay events that are created from multiple parents");
}
// Make sure event has the Content Claim info
final Long contentSize = event.getPreviousFileSize();
final String contentClaimId = event.getPreviousContentClaimIdentifier();
final String contentClaimSection = event.getPreviousContentClaimSection();
final String contentClaimContainer = event.getPreviousContentClaimContainer();
if (contentSize == null || contentClaimId == null || contentClaimSection == null || contentClaimContainer == null) {
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not contain the required Content Claim");
}
// Make sure that the source queue exists
if (event.getSourceQueueIdentifier() == null) {
throw new IllegalArgumentException("Cannot replay data from Provenance Event because the event does not specify the Source FlowFile Queue");
}
final Set<Connection> connections = flowManager.findAllConnections();
FlowFileQueue queue = null;
for (final Connection connection : connections) {
if (event.getSourceQueueIdentifier().equals(connection.getIdentifier())) {
queue = connection.getFlowFileQueue();
break;
}
}
if (queue == null) {
throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists");
}
// Create the ContentClaim. To do so, we first need the appropriate Resource Claim. Because we don't know whether or
// not the Resource Claim is still active, we first call ResourceClaimManager.getResourceClaim. If this returns
// null, then we know that the Resource Claim is no longer active and can just create a new one that is not writable.
// It's critical though that we first call getResourceClaim because otherwise, if the Resource Claim is active and we
// create a new one that is not writable, we could end up archiving or destroying the Resource Claim while it's still
// being written to by the Content Repository. This is important only because we are creating a FlowFile with this Resource
// Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc.
// then this is not necessary.
ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier());
if (resourceClaim == null) {
resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false);
}
// Increment Claimant Count, since we will now be referencing the Content Claim
resourceClaimManager.incrementClaimantCount(resourceClaim);
final long claimOffset = event.getPreviousContentClaimOffset() == null ? 0L : event.getPreviousContentClaimOffset().longValue();
final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, claimOffset);
contentClaim.setLength(event.getPreviousFileSize() == null ? -1L : event.getPreviousFileSize());
if (!contentRepository.isAccessible(contentClaim)) {
resourceClaimManager.decrementClaimantCount(resourceClaim);
throw new IllegalStateException("Cannot replay data from Provenance Event because the data is no longer available in the Content Repository");
}
final String parentUUID = event.getFlowFileUuid();
final String newFlowFileUUID = UUID.randomUUID().toString();
// We need to create a new FlowFile by populating it with information from the
// Provenance Event. Particularly of note here is that we are setting the FlowFile's
// contentClaimOffset to 0. This is done for backward compatibility reasons. ContentClaim
// used to not have a concept of an offset, and the offset was tied only to the FlowFile. This
// was later refactored, so that the offset was part of the ContentClaim. If we set the offset
// in both places, we'll end up skipping over that many bytes twice instead of once (once to get
// to the beginning of the Content Claim and again to get to the offset within that Content Claim).
// To avoid this, we just always set the offset in the Content Claim itself and set the
// FlowFileRecord's contentClaimOffset to 0.
final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
// Copy relevant info from source FlowFile
.addAttributes(event.getPreviousAttributes())
.contentClaim(contentClaim)
.contentClaimOffset(0L) // use 0 because we used the content claim offset in the Content Claim itself
.entryDate(System.currentTimeMillis())
.id(flowFileRepository.getNextFlowFileSequence())
.lineageStart(event.getLineageStartDate(), 0L)
.size(contentSize.longValue())
// Create a new UUID and add attributes indicating that this is a replay
.addAttribute("flowfile.replay", "true")
.addAttribute("flowfile.replay.timestamp", String.valueOf(new Date()))
.addAttribute(CoreAttributes.UUID.key(), newFlowFileUUID)
// remove attributes that may have existed on the source FlowFile that we don't want to exist on the new FlowFile
.removeAttributes(CoreAttributes.DISCARD_REASON.key(), CoreAttributes.ALTERNATE_IDENTIFIER.key())
// build the record
.build();
// Register a Provenance Event to indicate that we replayed the data.
final ProvenanceEventRecord replayEvent = new StandardProvenanceEventRecord.Builder()
.setEventType(ProvenanceEventType.REPLAY)
.addChildUuid(newFlowFileUUID)
.addParentUuid(parentUUID)
.setFlowFileUUID(parentUUID)
.setAttributes(Collections.emptyMap(), flowFileRecord.getAttributes())
.setCurrentContentClaim(event.getContentClaimContainer(), event.getContentClaimSection(), event.getContentClaimIdentifier(), event.getContentClaimOffset(), event.getFileSize())
.setDetails("Replay requested by " + user.getIdentity())
.setEventTime(System.currentTimeMillis())
.setFlowFileEntryDate(System.currentTimeMillis())
.setLineageStartDate(event.getLineageStartDate())
.setComponentType(event.getComponentType())
.setComponentId(event.getComponentId())
.build();
provenanceRepository.registerEvent(replayEvent);
// Update the FlowFile Repository to indicate that we have added the FlowFile to the flow
final StandardRepositoryRecord record = new StandardRepositoryRecord(queue);
record.setWorking(flowFileRecord, false);
record.setDestination(queue);
flowFileRepository.updateRepository(Collections.singleton(record));
// Enqueue the data
queue.put(flowFileRecord);
return replayEvent;
}
public ResourceClaimManager getResourceClaimManager() {
return resourceClaimManager;
}
@Override
public boolean isConnected() {
rwLock.readLock().lock();
try {
return connectionStatus != null && connectionStatus.getState() == NodeConnectionState.CONNECTED;
} finally {
rwLock.readLock().unlock();
}
}
public void setConnectionStatus(final NodeConnectionStatus connectionStatus) {
rwLock.writeLock().lock();
try {
this.connectionStatus = connectionStatus;
// update the heartbeat bean
this.heartbeatBeanRef.set(new HeartbeatBean(flowManager.getRootGroup(), isPrimary()));
} finally {
rwLock.writeLock().unlock();
}
}
public void heartbeat() {
if (!isClustered()) {
return;
}
if (this.shutdown) {
return;
}
final HeartbeatSendTask task = heartbeatSendTask.get();
if (task != null) {
clusterTaskExecutor.submit(task);
}
}
private class HeartbeatSendTask implements Runnable {
@Override
public void run() {
try (final NarCloseable narCloseable = NarCloseable.withFrameworkNar()) {
if (heartbeatsSuspended.get()) {
return;
}
final HeartbeatMessage message = createHeartbeatMessage();
if (message == null) {
LOG.debug("No heartbeat to send");
return;
}
heartbeater.send(message);
} catch (final UnknownServiceAddressException usae) {
if (LOG.isDebugEnabled()) {
LOG.debug(usae.getMessage());
}
} catch (final Throwable ex) {
LOG.warn("Failed to send heartbeat due to: " + ex);
if (LOG.isDebugEnabled()) {
LOG.warn("", ex);
}
}
}
}
HeartbeatMessage createHeartbeatMessage() {
try {
HeartbeatBean bean = heartbeatBeanRef.get();
if (bean == null) {
readLock.lock();
try {
bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
} finally {
readLock.unlock("createHeartbeatMessage");
}
}
// create heartbeat payload
final HeartbeatPayload hbPayload = new HeartbeatPayload();
hbPayload.setSystemStartTime(systemStartTime);
hbPayload.setActiveThreadCount(getActiveThreadCount());
hbPayload.setRevisionUpdateCount(revisionManager.getRevisionUpdateCount());
final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());
// create heartbeat message
final NodeIdentifier nodeId = getNodeId();
if (nodeId == null) {
LOG.warn("Cannot create Heartbeat Message because node's identifier is not known at this time");
return null;
}
final Heartbeat heartbeat = new Heartbeat(nodeId, connectionStatus, hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
message.setHeartbeat(heartbeat);
LOG.debug("Generated heartbeat");
return message;
} catch (final Throwable ex) {
LOG.warn("Failed to create heartbeat due to: " + ex, ex);
return null;
}
}
private void updateRemoteProcessGroups() {
final List<RemoteProcessGroup> remoteGroups = flowManager.getRootGroup().findAllRemoteProcessGroups();
for (final RemoteProcessGroup remoteGroup : remoteGroups) {
try {
remoteGroup.refreshFlowContents();
} catch (final CommunicationsException e) {
LOG.warn("Unable to communicate with remote instance {} due to {}", remoteGroup, e.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", e);
}
}
}
}
public Integer getRemoteSiteListeningPort() {
return remoteInputSocketPort;
}
public Integer getRemoteSiteListeningHttpPort() {
return remoteInputHttpPort;
}
public Boolean isRemoteSiteCommsSecure() {
return isSiteToSiteSecure;
}
public StandardProcessScheduler getProcessScheduler() {
return processScheduler;
}
public AuditService getAuditService() {
return auditService;
}
public ProvenanceRepository getProvenanceRepository() {
return provenanceRepository;
}
public StatusHistoryDTO getConnectionStatusHistory(final String connectionId) {
return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE);
}
public StatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startTime, final Date endTime, final int preferredDataPoints) {
return StatusHistoryUtil.createStatusHistoryDTO(statusHistoryRepository.getConnectionStatusHistory(connectionId, startTime, endTime, preferredDataPoints));
}
public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final boolean includeCounters) {
return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE, includeCounters);
}
public StatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startTime, final Date endTime, final int preferredDataPoints, final boolean includeCounters) {
return StatusHistoryUtil.createStatusHistoryDTO(statusHistoryRepository.getProcessorStatusHistory(processorId, startTime, endTime, preferredDataPoints, includeCounters));
}
public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {
return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE);
}
public StatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startTime, final Date endTime, final int preferredDataPoints) {
return StatusHistoryUtil.createStatusHistoryDTO(statusHistoryRepository.getProcessGroupStatusHistory(processGroupId, startTime, endTime, preferredDataPoints));
}
public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) {
return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE);
}
public StatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startTime, final Date endTime, final int preferredDataPoints) {
return StatusHistoryUtil.createStatusHistoryDTO(statusHistoryRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startTime, endTime, preferredDataPoints));
}
public StatusHistoryDTO getNodeStatusHistory() {
return StatusHistoryUtil.createStatusHistoryDTO(statusHistoryRepository.getNodeStatusHistory(null, null));
}
private NodeStatus getNodeStatusSnapshot() {
final SystemDiagnostics systemDiagnostics = getSystemDiagnostics();
final NodeStatus result = new NodeStatus();
result.setCreatedAtInMs(systemDiagnostics.getCreationTimestamp());
result.setFreeHeap(systemDiagnostics.getFreeHeap());
result.setUsedHeap(systemDiagnostics.getUsedHeap());
result.setHeapUtilization(systemDiagnostics.getHeapUtilization());
result.setFreeNonHeap(systemDiagnostics.getFreeNonHeap());
result.setUsedNonHeap(systemDiagnostics.getUsedNonHeap());
result.setOpenFileHandlers(systemDiagnostics.getOpenFileHandles());
result.setProcessorLoadAverage(systemDiagnostics.getProcessorLoadAverage());
result.setTotalThreads(systemDiagnostics.getTotalThreads());
result.setEventDrivenThreads(getActiveEventDrivenThreadCount());
result.setTimerDrivenThreads(getActiveTimerDrivenThreadCount());
result.setFlowFileRepositoryFreeSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getFreeSpace());
result.setFlowFileRepositoryUsedSpace(systemDiagnostics.getFlowFileRepositoryStorageUsage().getUsedSpace());
result.setContentRepositories(systemDiagnostics.getContentRepositoryStorageUsage().entrySet().stream().map(e -> getStorageStatus(e)).collect(Collectors.toList()));
result.setProvenanceRepositories(systemDiagnostics.getProvenanceRepositoryStorageUsage().entrySet().stream().map(e -> getStorageStatus(e)).collect(Collectors.toList()));
return result;
}
private static StorageStatus getStorageStatus(final Map.Entry<String, StorageUsage> storageUsage) {
final StorageStatus result = new StorageStatus();
result.setName(storageUsage.getKey());
result.setFreeSpace(storageUsage.getValue().getFreeSpace());
result.setUsedSpace(storageUsage.getValue().getUsedSpace());
return result;
}
public FlowFileEventRepository getFlowFileEventRepository() {
return flowFileEventRepository;
}
private static class HeartbeatBean {
private final ProcessGroup rootGroup;
private final boolean primary;
public HeartbeatBean(final ProcessGroup rootGroup, final boolean primary) {
this.rootGroup = rootGroup;
this.primary = primary;
}
public ProcessGroup getRootGroup() {
return rootGroup;
}
public boolean isPrimary() {
return primary;
}
}
}