blob: 8c266beedf05f24edea2d98bc4e161678a51248d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cluster.manager.impl;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.AuditService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.context.ClusterContext;
import org.apache.nifi.cluster.context.ClusterContextImpl;
import org.apache.nifi.cluster.event.Event;
import org.apache.nifi.cluster.event.EventManager;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.flow.ClusterDataFlow;
import org.apache.nifi.cluster.flow.DaoException;
import org.apache.nifi.cluster.flow.DataFlowManagementService;
import org.apache.nifi.cluster.flow.PersistedFlowState;
import org.apache.nifi.cluster.manager.HttpClusterManager;
import org.apache.nifi.cluster.manager.HttpRequestReplicator;
import org.apache.nifi.cluster.manager.HttpResponseMapper;
import org.apache.nifi.cluster.manager.NodeResponse;
import org.apache.nifi.cluster.manager.exception.ConnectingNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.DisconnectedNodeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.IllegalClusterStateException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException;
import org.apache.nifi.cluster.manager.exception.NoConnectedNodesException;
import org.apache.nifi.cluster.manager.exception.NoResponseFromNodesException;
import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException;
import org.apache.nifi.cluster.manager.exception.NodeReconnectionException;
import org.apache.nifi.cluster.manager.exception.PrimaryRoleAssignmentException;
import org.apache.nifi.cluster.manager.exception.SafeModeMutableRequestException;
import org.apache.nifi.cluster.manager.exception.UnknownNodeException;
import org.apache.nifi.cluster.manager.exception.UriConstructionException;
import org.apache.nifi.cluster.node.Node;
import org.apache.nifi.cluster.node.Node.Status;
import org.apache.nifi.cluster.protocol.ConnectionRequest;
import org.apache.nifi.cluster.protocol.ConnectionResponse;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeBulletins;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.impl.ClusterManagerProtocolSenderListener;
import org.apache.nifi.cluster.protocol.impl.ClusterServicesBroadcaster;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.ControllerStartupFailureMessage;
import org.apache.nifi.cluster.protocol.message.DisconnectMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.cluster.protocol.message.PrimaryRoleAssignmentMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage.MessageType;
import org.apache.nifi.cluster.protocol.message.ReconnectionFailureMessage;
import org.apache.nifi.cluster.protocol.message.ReconnectionRequestMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.StandardFlowSerializer;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
import org.apache.nifi.controller.reporting.ReportingTaskProvider;
import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.scheduling.QuartzSchedulingAgent;
import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceLoader;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.MetricDescriptor;
import org.apache.nifi.controller.status.history.StatusHistory;
import org.apache.nifi.controller.status.history.StatusHistoryUtil;
import org.apache.nifi.controller.status.history.StatusSnapshot;
import org.apache.nifi.diagnostics.GarbageCollection;
import org.apache.nifi.diagnostics.StorageUsage;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.remote.RemoteResourceManager;
import org.apache.nifi.remote.RemoteSiteListener;
import org.apache.nifi.remote.SocketRemoteSiteListener;
import org.apache.nifi.remote.cluster.ClusterNodeInformation;
import org.apache.nifi.remote.cluster.NodeInformation;
import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.OptimisticLockingManager;
import org.apache.nifi.web.Revision;
import org.apache.nifi.web.UpdateRevision;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.NodeDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceRequestDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceResultsDTO;
import org.apache.nifi.web.api.dto.status.ClusterStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.NodeStatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.apache.nifi.web.api.entity.FlowSnippetEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorsEntity;
import org.apache.nifi.web.api.entity.ProvenanceEntity;
import org.apache.nifi.web.api.entity.ProvenanceEventEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity;
import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity;
import org.apache.nifi.web.util.WebUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
import com.sun.jersey.api.client.ClientResponse;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServiceReferencingComponentsEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.ReportingTaskEntity;
import org.apache.nifi.web.api.entity.ReportingTasksEntity;
/**
* Provides a cluster manager implementation. The manager federates incoming
* HTTP client requests to the nodes' external API using the HTTP protocol. The
* manager also communicates with nodes using the nodes' internal socket
* protocol.
*
* The manager's socket address may broadcasted using multicast if a
* MulticastServiceBroadcaster instance is set on this instance. The manager
* instance must be started after setting the broadcaster.
*
* The manager may be configured with an EventManager for recording noteworthy
* lifecycle events (e.g., first heartbeat received, node status change).
*
* The start() and stop() methods must be called to initialize and stop the
* instance.
*
* @author unattributed
*/
public class WebClusterManager implements HttpClusterManager, ProtocolHandler, ControllerServiceProvider, ReportingTaskProvider {
public static final String ROOT_GROUP_ID_ALIAS = "root";
public static final String BULLETIN_CATEGORY = "Clustering";
private static final Logger logger = new NiFiLog(LoggerFactory.getLogger(WebClusterManager.class));
private static final Logger heartbeatLogger = new NiFiLog(LoggerFactory.getLogger("org.apache.nifi.cluster.heartbeat"));
/**
* The HTTP header to store a cluster context. An example of what may be
* stored in the context is a node's auditable actions in response to a
* cluster request. The cluster context is serialized using Java's
* serialization mechanism and hex encoded.
*/
public static final String CLUSTER_CONTEXT_HTTP_HEADER = "X-ClusterContext";
/**
* HTTP Header that stores a unique ID for each request that is replicated
* to the nodes. This is used for logging purposes so that request
* information, such as timing, can be correlated between the NCM and the
* nodes
*/
public static final String REQUEST_ID_HEADER = "X-RequestID";
/**
* The HTTP header that the NCM specifies to ask a node if they are able to
* process a given request. The value is always 150-NodeContinue. The node
* will respond with 150 CONTINUE if it is able to process the request, 417
* EXPECTATION_FAILED otherwise.
*/
public static final String NCM_EXPECTS_HTTP_HEADER = "X-NcmExpects";
public static final int NODE_CONTINUE_STATUS_CODE = 150;
/**
* The HTTP header that the NCM specifies to indicate that a node should
* invalidate the specified user group. This is done to ensure that user
* cache is not stale when an administrator modifies a group through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_GROUP_HEADER = "X-ClusterInvalidateUserGroup";
/**
* The HTTP header that the NCM specifies to indicate that a node should
* invalidate the specified user. This is done to ensure that user cache is
* not stale when an administrator modifies a user through the UI.
*/
public static final String CLUSTER_INVALIDATE_USER_HEADER = "X-ClusterInvalidateUser";
/**
* The default number of seconds to respond to a connecting node if the
* manager cannot provide it with a current data flow.
*/
private static final int DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS = 5;
public static final String DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION = "org.apache.nifi.controller.status.history.VolatileComponentStatusRepository";
public static final Pattern PROCESSORS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors");
public static final Pattern PROCESSOR_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/processors/[a-f0-9\\-]{36}");
public static final Pattern REMOTE_PROCESS_GROUPS_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups");
public static final Pattern REMOTE_PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/remote-process-groups/[a-f0-9\\-]{36}");
public static final Pattern PROCESS_GROUP_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))");
public static final Pattern TEMPLATE_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/template-instance");
public static final Pattern FLOW_SNIPPET_INSTANCE_URI_PATTERN = Pattern.compile("/nifi-api/controller/process-groups/(?:(?:root)|(?:[a-f0-9\\-]{36}))/snippet-instance");
public static final String PROVENANCE_URI = "/nifi-api/controller/provenance";
public static final Pattern PROVENANCE_QUERY_URI = Pattern.compile("/nifi-api/controller/provenance/[a-f0-9\\-]{36}");
public static final Pattern PROVENANCE_EVENT_URI = Pattern.compile("/nifi-api/controller/provenance/events/[0-9]+");
public static final String CONTROLLER_SERVICES_URI = "/nifi-api/controller/controller-services/node";
public static final Pattern CONTROLLER_SERVICE_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}");
public static final Pattern CONTROLLER_SERVICE_REFERENCES_URI_PATTERN = Pattern.compile("/nifi-api/controller/controller-services/node/[a-f0-9\\-]{36}/references");
public static final String REPORTING_TASKS_URI = "/nifi-api/controller/reporting-tasks/node";
public static final Pattern REPORTING_TASK_URI_PATTERN = Pattern.compile("/nifi-api/controller/reporting-tasks/node/[a-f0-9\\-]{36}");
private final NiFiProperties properties;
private final HttpRequestReplicator httpRequestReplicator;
private final HttpResponseMapper httpResponseMapper;
private final DataFlowManagementService dataFlowManagementService;
private final ClusterManagerProtocolSenderListener senderListener;
private final OptimisticLockingManager optimisticLockingManager;
private final StringEncryptor encryptor;
private final Queue<Heartbeat> pendingHeartbeats = new ConcurrentLinkedQueue<>();
private final ReentrantReadWriteLock resourceRWLock = new ReentrantReadWriteLock();
private final ClusterManagerLock readLock = new ClusterManagerLock(resourceRWLock.readLock(), "Read");
private final ClusterManagerLock writeLock = new ClusterManagerLock(resourceRWLock.writeLock(), "Write");
private final Set<Node> nodes = new HashSet<>();
private final ConcurrentMap<String, ReportingTaskNode> reportingTasks = new ConcurrentHashMap<>();
// null means the dataflow should be read from disk
private StandardDataFlow cachedDataFlow = null;
private NodeIdentifier primaryNodeId = null;
private Timer heartbeatMonitor;
private Timer heartbeatProcessor;
private volatile ClusterServicesBroadcaster servicesBroadcaster = null;
private volatile EventManager eventManager = null;
private volatile ClusterNodeFirewall clusterFirewall = null;
private volatile AuditService auditService = null;
private volatile ControllerServiceProvider controllerServiceProvider = null;
private final RemoteSiteListener remoteSiteListener;
private final Integer remoteInputPort;
private final Boolean remoteCommsSecure;
private final BulletinRepository bulletinRepository;
private final String instanceId;
private final FlowEngine reportingTaskEngine;
private final Map<NodeIdentifier, ComponentStatusRepository> componentMetricsRepositoryMap = new HashMap<>();
private final StandardProcessScheduler processScheduler;
private final long componentStatusSnapshotMillis;
public WebClusterManager(final HttpRequestReplicator httpRequestReplicator, final HttpResponseMapper httpResponseMapper,
final DataFlowManagementService dataFlowManagementService, final ClusterManagerProtocolSenderListener senderListener,
final NiFiProperties properties, final StringEncryptor encryptor, final OptimisticLockingManager optimisticLockingManager) {
if (httpRequestReplicator == null) {
throw new IllegalArgumentException("HttpRequestReplicator may not be null.");
} else if (httpResponseMapper == null) {
throw new IllegalArgumentException("HttpResponseMapper may not be null.");
} else if (dataFlowManagementService == null) {
throw new IllegalArgumentException("DataFlowManagementService may not be null.");
} else if (senderListener == null) {
throw new IllegalArgumentException("ClusterManagerProtocolSenderListener may not be null.");
} else if (properties == null) {
throw new IllegalArgumentException("NiFiProperties may not be null.");
}
// Ensure that our encryptor/decryptor is properly initialized
this.httpRequestReplicator = httpRequestReplicator;
this.httpResponseMapper = httpResponseMapper;
this.dataFlowManagementService = dataFlowManagementService;
this.properties = properties;
this.bulletinRepository = new VolatileBulletinRepository();
this.instanceId = UUID.randomUUID().toString();
this.senderListener = senderListener;
this.encryptor = encryptor;
this.optimisticLockingManager = optimisticLockingManager;
senderListener.addHandler(this);
senderListener.setBulletinRepository(bulletinRepository);
final String snapshotFrequency = properties.getProperty(NiFiProperties.COMPONENT_STATUS_SNAPSHOT_FREQUENCY, NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY);
long snapshotMillis;
try {
snapshotMillis = FormatUtils.getTimeDuration(snapshotFrequency, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
snapshotMillis = FormatUtils.getTimeDuration(NiFiProperties.DEFAULT_COMPONENT_STATUS_SNAPSHOT_FREQUENCY, TimeUnit.MILLISECONDS);
}
componentStatusSnapshotMillis = snapshotMillis;
remoteInputPort = properties.getRemoteInputPort();
if (remoteInputPort == null) {
remoteSiteListener = null;
remoteCommsSecure = null;
} else {
// Register the ClusterManagerServerProtocol as the appropriate resource for site-to-site Server Protocol
RemoteResourceManager.setServerProtocolImplementation(ClusterManagerServerProtocol.RESOURCE_NAME, ClusterManagerServerProtocol.class);
remoteCommsSecure = properties.isSiteToSiteSecure();
if (remoteCommsSecure) {
final SSLContext sslContext = SslContextFactory.createSslContext(properties, false);
if (sslContext == null) {
throw new IllegalStateException("NiFi Configured to allow Secure Site-to-Site communications but the Keystore/Truststore properties are not configured");
}
remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), sslContext, this);
} else {
remoteSiteListener = new SocketRemoteSiteListener(remoteInputPort.intValue(), null, this);
}
}
reportingTaskEngine = new FlowEngine(8, "Reporting Task Thread");
processScheduler = new StandardProcessScheduler(new Heartbeater() {
@Override
public void heartbeat() {
}
}, this, encryptor);
// When we construct the scheduling agents, we can pass null for a lot of the arguments because we are only
// going to be scheduling Reporting Tasks. Otherwise, it would not be okay.
processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, new TimerDrivenSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler);
}
public void start() throws IOException {
writeLock.lock();
try {
if (isRunning()) {
throw new IllegalStateException("Instance is already started.");
}
try {
// setup heartbeat monitoring
heartbeatMonitor = new Timer("Heartbeat Monitor", /* is daemon */ true);
heartbeatMonitor.scheduleAtFixedRate(new HeartbeatMonitoringTimerTask(), 0, getHeartbeatMonitoringIntervalSeconds() * 1000);
heartbeatProcessor = new Timer("Process Pending Heartbeats", true);
final int processPendingHeartbeatDelay = 1000 * Math.max(1, getClusterProtocolHeartbeatSeconds() / 2);
heartbeatProcessor.schedule(new ProcessPendingHeartbeatsTask(), processPendingHeartbeatDelay, processPendingHeartbeatDelay);
// start request replication service
httpRequestReplicator.start();
// start protocol service
senderListener.start();
// start flow management service
dataFlowManagementService.start();
if (remoteSiteListener != null) {
remoteSiteListener.start();
}
// load flow
final ClusterDataFlow clusterDataFlow;
if (dataFlowManagementService.isFlowCurrent()) {
clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
} else {
throw new IOException("Flow is not current.");
}
final byte[] serializedServices = clusterDataFlow.getControllerServices();
if ( serializedServices != null && serializedServices.length > 0 ) {
ControllerServiceLoader.loadControllerServices(this, new ByteArrayInputStream(serializedServices), encryptor, bulletinRepository, properties.getAutoResumeState());
}
// start multicast broadcasting service, if configured
if (servicesBroadcaster != null) {
servicesBroadcaster.start();
}
// start in safe mode
executeSafeModeTask();
// Load and start running Reporting Tasks
final byte[] serializedReportingTasks = clusterDataFlow.getReportingTasks();
if ( serializedReportingTasks != null && serializedReportingTasks.length > 0 ) {
loadReportingTasks(serializedReportingTasks);
}
} catch (final IOException ioe) {
logger.warn("Failed to initialize cluster services due to: " + ioe, ioe);
stop();
throw ioe;
}
} finally {
writeLock.unlock("START");
}
}
public void stop() throws IOException {
writeLock.lock();
try {
// returns true if any service is running
if (isRunning() == false) {
throw new IllegalArgumentException("Instance is already stopped.");
}
boolean encounteredException = false;
// stop the heartbeat monitoring
if (isHeartbeatMonitorRunning()) {
heartbeatMonitor.cancel();
heartbeatMonitor = null;
}
if (heartbeatProcessor != null) {
heartbeatProcessor.cancel();
heartbeatProcessor = null;
}
// stop the HTTP request replicator service
if (httpRequestReplicator.isRunning()) {
httpRequestReplicator.stop();
}
// stop the flow management service
if (dataFlowManagementService.isRunning()) {
dataFlowManagementService.stop();
}
if (remoteSiteListener != null) {
remoteSiteListener.stop();
}
// stop the protocol listener service
if (senderListener.isRunning()) {
try {
senderListener.stop();
} catch (final IOException ioe) {
encounteredException = true;
logger.warn("Failed to shutdown protocol service due to: " + ioe, ioe);
}
}
// stop the service broadcaster
if (isBroadcasting()) {
servicesBroadcaster.stop();
}
if ( processScheduler != null ) {
processScheduler.shutdown();
}
if (encounteredException) {
throw new IOException("Failed to shutdown Cluster Manager because one or more cluster services failed to shutdown. Check the logs for details.");
}
} finally {
writeLock.unlock("STOP");
}
}
public boolean isRunning() {
readLock.lock();
try {
return isHeartbeatMonitorRunning()
|| httpRequestReplicator.isRunning()
|| senderListener.isRunning()
|| dataFlowManagementService.isRunning()
|| isBroadcasting();
} finally {
readLock.unlock("isRunning");
}
}
@Override
public boolean canHandle(ProtocolMessage msg) {
return MessageType.CONNECTION_REQUEST == msg.getType()
|| MessageType.HEARTBEAT == msg.getType()
|| MessageType.CONTROLLER_STARTUP_FAILURE == msg.getType()
|| MessageType.BULLETINS == msg.getType()
|| MessageType.RECONNECTION_FAILURE == msg.getType();
}
@Override
public ProtocolMessage handle(final ProtocolMessage protocolMessage) throws ProtocolException {
switch (protocolMessage.getType()) {
case CONNECTION_REQUEST:
return handleConnectionRequest((ConnectionRequestMessage) protocolMessage);
case HEARTBEAT:
final HeartbeatMessage heartbeatMessage = (HeartbeatMessage) protocolMessage;
final Heartbeat original = heartbeatMessage.getHeartbeat();
final NodeIdentifier originalNodeId = original.getNodeIdentifier();
final Heartbeat heartbeatWithDn = new Heartbeat(addRequestorDn(originalNodeId, heartbeatMessage.getRequestorDN()), original.isPrimary(), original.isConnected(), original.getPayload());
handleHeartbeat(heartbeatWithDn);
return null;
case CONTROLLER_STARTUP_FAILURE:
new Thread(new Runnable() {
@Override
public void run() {
handleControllerStartupFailure((ControllerStartupFailureMessage) protocolMessage);
}
}, "Handle Controller Startup Failure Message from " + ((ControllerStartupFailureMessage) protocolMessage).getNodeId()).start();
return null;
case RECONNECTION_FAILURE:
new Thread(new Runnable() {
@Override
public void run() {
handleReconnectionFailure((ReconnectionFailureMessage) protocolMessage);
}
}, "Handle Reconnection Failure Message from " + ((ReconnectionFailureMessage) protocolMessage).getNodeId()).start();
return null;
case BULLETINS:
final NodeBulletinsMessage bulletinsMessage = (NodeBulletinsMessage) protocolMessage;
handleBulletins(bulletinsMessage.getBulletins());
return null;
default:
throw new ProtocolException("No handler defined for message type: " + protocolMessage.getType());
}
}
/**
* Services connection requests. If the data flow management service is
* unable to provide a current copy of the data flow, then the returned
* connection response will indicate the node should try later. Otherwise,
* the connection response will contain the the flow and the node
* identifier.
*
* If this instance is configured with a firewall and the request is
* blocked, then the response will not contain a node identifier.
*
* @param request a connection request
*
* @return a connection response
*/
@Override
public ConnectionResponse requestConnection(final ConnectionRequest request) {
final boolean lockObtained = writeLock.tryLock(3, TimeUnit.SECONDS);
if (!lockObtained) {
// Create try-later response because we are too busy to service the request right now. We do not want
// to wait long because we want Node/NCM comms to be very responsive
final int tryAgainSeconds;
if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
} else {
tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
}
// record event
final String msg = "Connection requested from node, but manager was too busy to service request. Instructing node to try again in " + tryAgainSeconds + " seconds.";
addEvent(request.getProposedNodeIdentifier(), msg);
addBulletin(request.getProposedNodeIdentifier(), Severity.INFO, msg);
// return try later response
return new ConnectionResponse(tryAgainSeconds);
}
try {
// resolve the proposed node identifier to a valid node identifier
final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(request.getProposedNodeIdentifier());
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
// if the socket address is not listed in the firewall, then return a null response
logger.info("Firewall blocked connection request from node " + resolvedNodeIdentifier);
return ConnectionResponse.createBlockedByFirewallResponse();
}
// get a raw reference to the node (if it doesn't exist, node will be null)
Node node = getRawNode(resolvedNodeIdentifier.getId());
// create a new node if necessary and set status to connecting
if (node == null) {
node = new Node(resolvedNodeIdentifier, Status.CONNECTING);
addEvent(node.getNodeId(), "Connection requested from new node. Setting status to connecting.");
nodes.add(node);
} else {
node.setStatus(Status.CONNECTING);
addEvent(resolvedNodeIdentifier, "Connection requested from existing node. Setting status to connecting");
}
// record the time of the connection request
node.setConnectionRequestedTimestamp(new Date().getTime());
// clear out old heartbeat info
node.setHeartbeat(null);
// try to obtain a current flow
if (dataFlowManagementService.isFlowCurrent()) {
// if a cached copy does not exist, load it from disk
if (cachedDataFlow == null) {
final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
}
// determine if this node should be assigned the primary role
final boolean primaryRole;
if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
setPrimaryNodeId(node.getNodeId());
addEvent(node.getNodeId(), "Setting primary role in connection response.");
primaryRole = true;
} else {
primaryRole = false;
}
return new ConnectionResponse(node.getNodeId(), cachedDataFlow, primaryRole, remoteInputPort, remoteCommsSecure, instanceId);
}
/*
* The manager does not have a current copy of the data flow,
* so it will instruct the node to try connecting at a later
* time. Meanwhile, the flow will be locked down from user
* changes because the node is marked as connecting.
*/
/*
* Create try-later response based on flow retrieval delay to give
* the flow management service a chance to retrieve a curren flow
*/
final int tryAgainSeconds;
if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
} else {
tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
}
// record event
addEvent(node.getNodeId(), "Connection requested from node, but manager was unable to obtain current flow. Instructing node to try again in " + tryAgainSeconds + " seconds.");
// return try later response
return new ConnectionResponse(tryAgainSeconds);
} finally {
writeLock.unlock("requestConnection");
}
}
/**
* Services reconnection requests for a given node. If the node indicates
* reconnection failure, then the node will be set to disconnected and if
* the node has primary role, then the role will be revoked. Otherwise, a
* reconnection request will be sent to the node, initiating the connection
* handshake.
*
* @param nodeId a node identifier
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeReconnectionException if the node cannot be
* reconnected because the node is not disconnected
* @throws NodeReconnectionException if the reconnection message failed to
* be sent or the cluster could not provide a current data flow for the
* reconnection request
*/
@Override
public void requestReconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeReconnectionException {
Node node = null;
final boolean primaryRole;
final int tryAgainSeconds;
writeLock.lock();
try {
// check if we know about this node and that it is disconnected
node = getRawNode(nodeId);
logger.info("Request was made by {} to reconnect node {} to cluster", userDn, node == null ? nodeId : node);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Status.DISCONNECTED != node.getStatus()) {
throw new IllegalNodeReconnectionException("Node must be disconnected before it can reconnect.");
}
// clear out old heartbeat info
node.setHeartbeat(null);
// get the dataflow to send with the reconnection request
if (!dataFlowManagementService.isFlowCurrent()) {
/* node remains disconnected */
final String msg = "Reconnection requested for node, but manager was unable to obtain current flow. Setting node to disconnected.";
addEvent(node.getNodeId(), msg);
addBulletin(node, Severity.WARNING, msg);
throw new NodeReconnectionException("Manager was unable to obtain current flow to provide in reconnection request to node. Try again in a few seconds.");
}
// if a cached copy does not exist, load it from disk
if (cachedDataFlow == null) {
final ClusterDataFlow clusterDataFlow = dataFlowManagementService.loadDataFlow();
cachedDataFlow = clusterDataFlow.getDataFlow();
primaryNodeId = clusterDataFlow.getPrimaryNodeId();
}
node.setStatus(Status.CONNECTING);
addEvent(node.getNodeId(), "Reconnection requested for node. Setting status to connecting.");
// determine if this node should be assigned the primary role
if (primaryNodeId == null || primaryNodeId.logicallyEquals(node.getNodeId())) {
setPrimaryNodeId(node.getNodeId());
addEvent(node.getNodeId(), "Setting primary role in reconnection request.");
primaryRole = true;
} else {
primaryRole = false;
}
if (dataFlowManagementService.getRetrievalDelaySeconds() <= 0) {
tryAgainSeconds = DEFAULT_CONNECTION_REQUEST_TRY_AGAIN_SECONDS;
} else {
tryAgainSeconds = dataFlowManagementService.getRetrievalDelaySeconds();
}
} catch (final UnknownNodeException | IllegalNodeReconnectionException | NodeReconnectionException une) {
throw une;
} catch (final Exception ex) {
logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + ex, ex);
node.setStatus(Status.DISCONNECTED);
final String eventMsg = "Problem encountered issuing reconnection request. Node will remain disconnected: " + ex;
addEvent(node.getNodeId(), eventMsg);
addBulletin(node, Severity.WARNING, eventMsg);
// Exception thrown will include node ID but event/bulletin do not because the node/id is passed along with the message
throw new NodeReconnectionException("Problem encountered issuing reconnection request to " + node.getNodeId() + ". Node will remain disconnected: " + ex, ex);
} finally {
writeLock.unlock("requestReconnection");
}
// Asynchronously start attempting reconnection. This is not completely thread-safe, as
// we do this by releasing the write lock and then obtaining a read lock for each attempt,
// so we suffer from the ABA problem. However, we are willing to accept the consequences of
// this situation in order to avoid holding a lock for the entire duration. "The consequences"
// are that a second thread could potentially be doing the same thing, issuing a reconnection request.
// However, this is very unlikely to happen, based on the conditions under which we issue a reconnection
// request. And if we do, the node will simply reconnect multiple times, which is not a big deal.
requestReconnectionAsynchronously(node, primaryRole, 10, tryAgainSeconds);
}
private void requestReconnectionAsynchronously(final Node node, final boolean primaryRole, final int reconnectionAttempts, final int retrySeconds) {
final Thread reconnectionThread = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < reconnectionAttempts; i++) {
final ReconnectionRequestMessage request = new ReconnectionRequestMessage();
try {
readLock.lock();
try {
if (Status.CONNECTING != node.getStatus()) {
// the node status has changed. It's no longer appropriate to attempt reconnection.
return;
}
// create the request
request.setNodeId(node.getNodeId());
request.setDataFlow(cachedDataFlow);
request.setPrimary(primaryRole);
request.setManagerRemoteSiteCommsSecure(remoteCommsSecure);
request.setManagerRemoteSiteListeningPort(remoteInputPort);
request.setInstanceId(instanceId);
} finally {
readLock.unlock("Reconnect " + node.getNodeId());
}
// Issue a reconnection request to the node.
senderListener.requestReconnection(request);
node.setConnectionRequestedTimestamp(System.currentTimeMillis());
// successfully told node to reconnect -- we're done!
return;
} catch (final Exception e) {
logger.warn("Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e);
if (logger.isDebugEnabled()) {
logger.warn("", e);
}
addBulletin(node, Severity.WARNING, "Problem encountered issuing reconnection request to node " + node.getNodeId() + " due to: " + e);
}
try {
Thread.sleep(1000L * retrySeconds);
} catch (final InterruptedException ie) {
break;
}
}
// We failed to reconnect 10 times. We must now mark node as disconnected.
writeLock.lock();
try {
if (Status.CONNECTING == node.getStatus()) {
requestDisconnectionQuietly(node.getNodeId(), "Failed to issue Reconnection Request " + reconnectionAttempts + " times");
}
} finally {
writeLock.unlock("Mark node as Disconnected as a result of reconnection failure");
}
}
}, "Reconnect " + node.getNodeId());
reconnectionThread.start();
}
private Map<String, ReportingTaskNode> loadReportingTasks(final byte[] serialized) {
final Map<String, ReportingTaskNode> tasks = new HashMap<>();
try {
final Document document = parse(serialized);
final NodeList tasksNodes = document.getElementsByTagName("tasks");
final Element tasksElement = (Element) tasksNodes.item(0);
//optional properties for all ReportingTasks
for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "task")) {
//add global properties common to all tasks
Map<String, String> properties = new HashMap<>();
//get properties for the specific reporting task - id, name, class,
//and schedulingPeriod must be set
final String taskId = DomUtils.getChild(taskElement, "id").getTextContent().trim();
final String taskName = DomUtils.getChild(taskElement, "name").getTextContent().trim();
final List<Element> schedulingStrategyNodeList = DomUtils.getChildElementsByTagName(taskElement, "schedulingStrategy");
String schedulingStrategyValue = SchedulingStrategy.TIMER_DRIVEN.name();
if (schedulingStrategyNodeList.size() == 1) {
final String specifiedValue = schedulingStrategyNodeList.get(0).getTextContent();
try {
schedulingStrategyValue = SchedulingStrategy.valueOf(specifiedValue).name();
} catch (final Exception e) {
throw new RuntimeException("Cannot start Reporting Task with id " + taskId + " because its Scheduling Strategy does not have a valid value", e);
}
}
final SchedulingStrategy schedulingStrategy = SchedulingStrategy.valueOf(schedulingStrategyValue);
final String taskSchedulingPeriod = DomUtils.getChild(taskElement, "schedulingPeriod").getTextContent().trim();
final String taskClass = DomUtils.getChild(taskElement, "class").getTextContent().trim();
//optional task-specific properties
for (final Element optionalProperty : DomUtils.getChildElementsByTagName(taskElement, "property")) {
final String name = optionalProperty.getAttribute("name");
final String value = optionalProperty.getTextContent().trim();
properties.put(name, value);
}
//set the class to be used for the configured reporting task
final ReportingTaskNode reportingTaskNode;
try {
reportingTaskNode = createReportingTask(taskClass, taskId, false);
} catch (final ReportingTaskInstantiationException e) {
logger.error("Unable to load reporting task {} due to {}", new Object[]{taskId, e});
if (logger.isDebugEnabled()) {
logger.error("", e);
}
continue;
}
final ReportingTask reportingTask = reportingTaskNode.getReportingTask();
final ComponentLog componentLog = new SimpleProcessLogger(taskId, reportingTask);
final ReportingInitializationContext config = new StandardReportingInitializationContext(taskId, taskName,
schedulingStrategy, taskSchedulingPeriod, componentLog, this);
reportingTask.initialize(config);
final Map<PropertyDescriptor, String> resolvedProps;
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
resolvedProps = new HashMap<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = reportingTask.getPropertyDescriptor(entry.getKey());
resolvedProps.put(descriptor, entry.getValue());
}
}
for (final Map.Entry<PropertyDescriptor, String> entry : resolvedProps.entrySet()) {
reportingTaskNode.setProperty(entry.getKey().getName(), entry.getValue());
}
processScheduler.schedule(reportingTaskNode);
tasks.put(reportingTaskNode.getIdentifier(), reportingTaskNode);
}
} catch (final SAXException | ParserConfigurationException | IOException | DOMException | NumberFormatException | InitializationException t) {
logger.error("Unable to load reporting tasks due to {}", new Object[]{t});
if (logger.isDebugEnabled()) {
logger.error("", t);
}
}
return tasks;
}
@Override
public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null) {
throw new NullPointerException();
}
ReportingTask task = null;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader detectedClassLoader = ExtensionManager.getClassLoader(type);
final Class<?> rawClass;
if (detectedClassLoader == null) {
rawClass = Class.forName(type);
} else {
rawClass = Class.forName(type, false, detectedClassLoader);
}
Thread.currentThread().setContextClassLoader(detectedClassLoader);
final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
final Object reportingTaskObj = reportingTaskClass.newInstance();
task = reportingTaskClass.cast(reportingTaskObj);
} catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
throw new ReportingTaskInstantiationException(type, t);
} finally {
if (ctxClassLoader != null) {
Thread.currentThread().setContextClassLoader(ctxClassLoader);
}
}
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
final ReportingTaskNode taskNode = new ClusteredReportingTaskNode(task, id, processScheduler,
new ClusteredEventAccess(this), bulletinRepository, controllerServiceProvider, validationContextFactory);
taskNode.setName(task.getClass().getSimpleName());
reportingTasks.put(id, taskNode);
if ( firstTimeAdded ) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
}
}
return taskNode;
}
private Document parse(final byte[] serialized) throws SAXException, ParserConfigurationException, IOException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder builder = docFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@Override
public void fatalError(final SAXParseException err) throws SAXException {
logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
if (logger.isDebugEnabled()) {
logger.error("Error Stack Dump", err);
}
throw err;
}
@Override
public void error(final SAXParseException err) throws SAXParseException {
logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
if (logger.isDebugEnabled()) {
logger.error("Error Stack Dump", err);
}
throw err;
}
@Override
public void warning(final SAXParseException err) throws SAXParseException {
logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage());
if (logger.isDebugEnabled()) {
logger.warn("Warning stack dump", err);
}
throw err;
}
});
// build the docuemnt
final Document document = builder.parse(new ByteArrayInputStream(serialized));
return document;
}
private void addBulletin(final Node node, final Severity severity, final String msg) {
addBulletin(node.getNodeId(), severity, msg);
}
private void addBulletin(final NodeIdentifier nodeId, final Severity severity, final String msg) {
bulletinRepository.addBulletin(BulletinFactory.createBulletin(BULLETIN_CATEGORY, severity.toString(),
nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg));
}
/**
* Services a disconnection request.
*
* @param nodeId a node identifier
* @param userDn the DN of the user requesting the disconnection
*
* @throws UnknownNodeException if the node does not exist
* @throws IllegalNodeDisconnectionException if the node cannot be
* disconnected due to the cluster's state (e.g., node is last connected
* node or node is primary)
* @throws NodeDisconnectionException if the disconnection message fails to
* be sent.
*/
@Override
public void requestDisconnection(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDisconnectionException, NodeDisconnectionException {
writeLock.lock();
try {
// check that the node is known
final Node node = getNode(nodeId);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
}
requestDisconnection(node.getNodeId(), /* ignore last node */ false, "User " + userDn + " Disconnected Node");
} finally {
writeLock.unlock("requestDisconnection(String)");
}
}
/**
* Requests a disconnection to the node with the given node ID, but any
* exception thrown is suppressed.
*
* @param nodeId the node ID
*/
private void requestDisconnectionQuietly(final NodeIdentifier nodeId, final String explanation) {
try {
requestDisconnection(nodeId, /* ignore node check */ true, explanation);
} catch (final IllegalNodeDisconnectionException | NodeDisconnectionException ex) { /* suppress exception */ }
}
/**
* Issues a disconnection message to the node identified by the given node
* ID. If the node is not known, then a UnknownNodeException is thrown. If
* the node cannot be disconnected due to the cluster's state and
* ignoreLastNodeCheck is false, then a IllegalNodeDisconnectionException is
* thrown. Otherwise, a disconnection message is issued to the node.
*
* Whether the disconnection message is successfully sent to the node, the
* node is marked as disconnected and if the node is the primary node, then
* the primary role is revoked.
*
* @param nodeId the ID of the node
* @param ignoreNodeChecks if false, checks will be made to ensure the
* cluster supports the node's disconnection (e.g., the node is not the last
* connected node in the cluster; the node is not the primary); otherwise,
* the request is made regardless of the cluster state
* @param explanation
*
* @throws IllegalNodeDisconnectionException if the node cannot be
* disconnected due to the cluster's state (e.g., node is last connected
* node or node is primary). Not thrown if ignoreNodeChecks is true.
* @throws NodeDisconnectionException if the disconnection message fails to
* be sent.
*/
private void requestDisconnection(final NodeIdentifier nodeId, final boolean ignoreNodeChecks, final String explanation)
throws IllegalNodeDisconnectionException, NodeDisconnectionException {
writeLock.lock();
try {
// check that the node is known
final Node node = getRawNode(nodeId.getId());
if (node == null) {
if (ignoreNodeChecks) {
// issue the disconnection
final DisconnectMessage request = new DisconnectMessage();
request.setNodeId(nodeId);
request.setExplanation(explanation);
addEvent(nodeId, "Disconnection requested due to " + explanation);
senderListener.disconnect(request);
addEvent(nodeId, "Node disconnected due to " + explanation);
addBulletin(nodeId, Severity.INFO, "Node disconnected due to " + explanation);
return;
} else {
throw new UnknownNodeException("Node does not exist");
}
}
// if necessary, check that the node may be disconnected
if (!ignoreNodeChecks) {
final Set<NodeIdentifier> connectedNodes = getNodeIds(Status.CONNECTED);
// cannot disconnect the last connected node in the cluster
if (connectedNodes.size() == 1 && connectedNodes.iterator().next().equals(nodeId)) {
throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the only connected node in the cluster.");
} else if (isPrimaryNode(nodeId)) {
// cannot disconnect the primary node in the cluster
throw new IllegalNodeDisconnectionException("Node may not be disconnected because it is the primary node in the cluster.");
}
}
// update status
node.setStatus(Status.DISCONNECTED);
notifyDataFlowManagementServiceOfNodeStatusChange();
// issue the disconnection
final DisconnectMessage request = new DisconnectMessage();
request.setNodeId(nodeId);
request.setExplanation(explanation);
addEvent(nodeId, "Disconnection requested due to " + explanation);
senderListener.disconnect(request);
addEvent(nodeId, "Node disconnected due to " + explanation);
addBulletin(node, Severity.INFO, "Node disconnected due to " + explanation);
} finally {
writeLock.unlock("requestDisconnection(NodeIdentifier, boolean)");
}
}
/**
* Messages the node to have the primary role. If the messaging fails, then
* the node is marked as disconnected.
*
* @param nodeId the node ID to assign primary role
*
* @return true if primary role assigned; false otherwise
*/
private boolean assignPrimaryRole(final NodeIdentifier nodeId) {
writeLock.lock();
try {
// create primary role message
final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
msg.setNodeId(nodeId);
msg.setPrimary(true);
logger.info("Attempting to assign primary role to node: " + nodeId);
// message
senderListener.assignPrimaryRole(msg);
logger.info("Assigned primary role to node: " + nodeId);
addBulletin(nodeId, Severity.INFO, "Node assigned primary role");
// true indicates primary role assigned
return true;
} catch (final ProtocolException ex) {
logger.warn("Failed attempt to assign primary role to node " + nodeId + " due to " + ex);
addBulletin(nodeId, Severity.ERROR, "Failed to assign primary role to node due to: " + ex);
// mark node as disconnected and log/record the event
final Node node = getRawNode(nodeId.getId());
node.setStatus(Status.DISCONNECTED);
addEvent(node.getNodeId(), "Disconnected because of failed attempt to assign primary role.");
addBulletin(nodeId, Severity.WARNING, "Node disconnected because of failed attempt to assign primary role");
// false indicates primary role failed to be assigned
return false;
} finally {
writeLock.unlock("assignPrimaryRole");
}
}
/**
* Messages the node with the given node ID to no longer have the primary
* role. If the messaging fails, then the node is marked as disconnected.
*
* @return true if the primary role was revoked from the node; false
* otherwise
*/
private boolean revokePrimaryRole(final NodeIdentifier nodeId) {
writeLock.lock();
try {
// create primary role message
final PrimaryRoleAssignmentMessage msg = new PrimaryRoleAssignmentMessage();
msg.setNodeId(nodeId);
msg.setPrimary(false);
logger.info("Attempting to revoke primary role from node: " + nodeId);
// send message
senderListener.assignPrimaryRole(msg);
logger.info("Revoked primary role from node: " + nodeId);
addBulletin(nodeId, Severity.INFO, "Primary Role revoked from node");
// true indicates primary role was revoked
return true;
} catch (final ProtocolException ex) {
logger.warn("Failed attempt to revoke primary role from node " + nodeId + " due to " + ex);
// mark node as disconnected and log/record the event
final Node node = getRawNode(nodeId.getId());
node.setStatus(Status.DISCONNECTED);
addEvent(node.getNodeId(), "Disconnected because of failed attempt to revoke primary role.");
addBulletin(node, Severity.ERROR, "Node disconnected because of failed attempt to revoke primary role");
// false indicates primary role failed to be revoked
return false;
} finally {
writeLock.unlock("revokePrimaryRole");
}
}
private NodeIdentifier addRequestorDn(final NodeIdentifier nodeId, final String dn) {
return new NodeIdentifier(nodeId.getId(), nodeId.getApiAddress(),
nodeId.getApiPort(), nodeId.getSocketAddress(), nodeId.getSocketPort(), dn);
}
private ConnectionResponseMessage handleConnectionRequest(final ConnectionRequestMessage requestMessage) {
final NodeIdentifier proposedIdentifier = requestMessage.getConnectionRequest().getProposedNodeIdentifier();
final ConnectionRequest requestWithDn = new ConnectionRequest(addRequestorDn(proposedIdentifier, requestMessage.getRequestorDN()));
final ConnectionResponse response = requestConnection(requestWithDn);
final ConnectionResponseMessage responseMessage = new ConnectionResponseMessage();
responseMessage.setConnectionResponse(response);
return responseMessage;
}
private void handleControllerStartupFailure(final ControllerStartupFailureMessage msg) {
writeLock.lock();
try {
final Node node = getRawNode(msg.getNodeId().getId());
if (node != null) {
node.setStatus(Status.DISCONNECTED);
addEvent(msg.getNodeId(), "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
addBulletin(node, Severity.ERROR, "Node could not join cluster because it failed to start up properly. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage());
}
} finally {
writeLock.unlock("handleControllerStartupFailure");
}
}
private void handleReconnectionFailure(final ReconnectionFailureMessage msg) {
writeLock.lock();
try {
final Node node = getRawNode(msg.getNodeId().getId());
if (node != null) {
node.setStatus(Status.DISCONNECTED);
final String errorMsg = "Node could not rejoin cluster. Setting node to Disconnected. Node reported the following error: " + msg.getExceptionMessage();
addEvent(msg.getNodeId(), errorMsg);
addBulletin(node, Severity.ERROR, errorMsg);
}
} finally {
writeLock.unlock("handleControllerStartupFailure");
}
}
/**
* Adds an instance of a specified controller service.
*
* @param type
* @param id
* @param properties
* @return
*/
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
}
@Override
public ControllerService getControllerService(String serviceIdentifier) {
return controllerServiceProvider.getControllerService(serviceIdentifier);
}
@Override
public ControllerServiceNode getControllerServiceNode(final String id) {
return controllerServiceProvider.getControllerServiceNode(id);
}
@Override
public boolean isControllerServiceEnabled(final ControllerService service) {
return controllerServiceProvider.isControllerServiceEnabled(service);
}
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
}
@Override
public String getControllerServiceName(final String serviceIdentifier) {
return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
}
@Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableControllerService(serviceNode);
}
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableControllerService(serviceNode);
}
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
return controllerServiceProvider.getAllControllerServices();
}
@Override
public void disableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.disableReferencingServices(serviceNode);
}
@Override
public void enableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.enableReferencingServices(serviceNode);
}
@Override
public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.scheduleReferencingComponents(serviceNode);
}
@Override
public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
}
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
}
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
}
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
}
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
}
private byte[] serialize(final Document doc) throws TransformerException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DOMSource domSource = new DOMSource(doc);
final StreamResult streamResult = new StreamResult(baos);
// configure the transformer and convert the DOM
final TransformerFactory transformFactory = TransformerFactory.newInstance();
final Transformer transformer = transformFactory.newTransformer();
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
// transform the document to byte stream
transformer.transform(domSource, streamResult);
return baos.toByteArray();
}
private byte[] serializeControllerServices() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("controllerServices");
document.appendChild(rootElement);
for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
StandardFlowSerializer.addControllerService(rootElement, serviceNode, encryptor);
}
return serialize(document);
}
private byte[] serializeReportingTasks() throws ParserConfigurationException, TransformerException {
final DocumentBuilderFactory docFactory = DocumentBuilderFactory.newInstance();
final DocumentBuilder docBuilder = docFactory.newDocumentBuilder();
final Document document = docBuilder.newDocument();
final Element rootElement = document.createElement("reportingTasks");
document.appendChild(rootElement);
for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
StandardFlowSerializer.addReportingTask(rootElement, taskNode, encryptor);
}
return serialize(document);
}
public void saveControllerServices() {
try {
dataFlowManagementService.updateControllerServices(serializeControllerServices());
} catch (final Exception e) {
logger.error("Failed to save changes to NCM's Controller Services; changes may be lost on restart due to " + e);
if ( logger.isDebugEnabled() ) {
logger.error("", e);
}
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Controller Services", Severity.ERROR.name(),
"Failed to save changes to NCM's Controller Services; changes may be lost on restart. See logs for more details."));
}
}
public void saveReportingTasks() {
try {
dataFlowManagementService.updateReportingTasks(serializeReportingTasks());
} catch (final Exception e) {
logger.error("Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart due to " + e);
if ( logger.isDebugEnabled() ) {
logger.error("", e);
}
getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Reporting Tasks", Severity.ERROR.name(),
"Failed to save changes to NCM's Reporting Tasks; changes may be lost on restart. See logs for more details."));
}
}
@Override
public Set<ReportingTaskNode> getAllReportingTasks() {
readLock.lock();
try {
return new HashSet<>(reportingTasks.values());
} finally {
readLock.unlock("getReportingTasks");
}
}
@Override
public ReportingTaskNode getReportingTaskNode(final String taskId) {
readLock.lock();
try {
return reportingTasks.get(taskId);
} finally {
readLock.unlock("getReportingTaskNode");
}
}
@Override
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanStart();
processScheduler.schedule(reportingTaskNode);
}
@Override
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanStop();
processScheduler.unschedule(reportingTaskNode);
}
@Override
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
writeLock.lock();
try {
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
if ( existing == null || existing != reportingTaskNode ) {
throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
}
reportingTaskNode.verifyCanDelete();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
final PropertyDescriptor descriptor = entry.getKey();
if (descriptor.getControllerServiceDefinition() != null ) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
if ( value != null ) {
final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
if ( serviceNode != null ) {
serviceNode.removeReference(reportingTaskNode);
}
}
}
}
reportingTasks.remove(reportingTaskNode.getIdentifier());
} finally {
writeLock.unlock("removeReportingTask");
}
}
@Override
public void disableReportingTask(final ReportingTaskNode reportingTask) {
reportingTask.verifyCanDisable();
processScheduler.disableReportingTask(reportingTask);
}
@Override
public void enableReportingTask(final ReportingTaskNode reportingTask) {
reportingTask.verifyCanEnable();
processScheduler.enableReportingTask(reportingTask);
}
/**
* Handle a bulletins message.
*
* @param bulletins
*/
public void handleBulletins(final NodeBulletins bulletins) {
final NodeIdentifier nodeIdentifier = bulletins.getNodeIdentifier();
final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
// unmarshal the message
BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
for (final Bulletin bulletin : payload.getBulletins()) {
bulletin.setNodeAddress(nodeAddress);
bulletinRepository.addBulletin(bulletin);
}
}
/**
* Handles a node's heartbeat. If this heartbeat is a node's first heartbeat
* since its connection request, then the manager will mark the node as
* connected. If the node was previously disconnected due to a lack of
* heartbeat, then a reconnection request is issued. If the node was
* disconnected for other reasons, then a disconnection request is issued.
* If this instance is configured with a firewall and the heartbeat is
* blocked, then a disconnection request is issued.
*
* @param heartbeat
*/
@Override
public void handleHeartbeat(final Heartbeat heartbeat) {
// sanity check heartbeat
if (heartbeat == null) {
throw new IllegalArgumentException("Heartbeat may not be null.");
} else if (heartbeat.getNodeIdentifier() == null) {
throw new IllegalArgumentException("Heartbeat does not contain a node ID.");
}
/*
* Processing a heartbeat requires a write lock, which may take a while
* to obtain. Only the last heartbeat is necessary to process per node.
* Futhermore, since many could pile up, heartbeats are processed in
* bulk.
*
* The below queue stores the pending heartbeats.
*/
pendingHeartbeats.add(heartbeat);
}
private void processPendingHeartbeats() {
Node node;
writeLock.lock();
try {
/*
* Get the most recent heartbeats for the nodes in the cluster. This
* is achieved by "draining" the pending heartbeats queue, populating
* a map that associates a node identifier with its latest heartbeat, and
* finally, getting the values of the map.
*/
final Map<NodeIdentifier, Heartbeat> mostRecentHeartbeatsMap = new HashMap<>();
Heartbeat aHeartbeat;
while ((aHeartbeat = pendingHeartbeats.poll()) != null) {
mostRecentHeartbeatsMap.put(aHeartbeat.getNodeIdentifier(), aHeartbeat);
}
final Collection<Heartbeat> mostRecentHeartbeats = new ArrayList<>(mostRecentHeartbeatsMap.values());
// return fast if no work to do
if (mostRecentHeartbeats.isEmpty()) {
return;
}
logNodes("Before Heartbeat Processing", heartbeatLogger);
final int numPendingHeartbeats = mostRecentHeartbeats.size();
if (heartbeatLogger.isDebugEnabled()) {
heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
}
for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
try {
// resolve the proposed node identifier to valid node identifier
final NodeIdentifier resolvedNodeIdentifier = resolveProposedNodeIdentifier(mostRecentHeartbeat.getNodeIdentifier());
// get a raw reference to the node (if it doesn't exist, node will be null)
node = getRawNode(resolvedNodeIdentifier.getId());
// if the node thinks it has the primary role, but the manager has assigned the role to a different node, then revoke the role
if (mostRecentHeartbeat.isPrimary() && !isPrimaryNode(resolvedNodeIdentifier)) {
addEvent(resolvedNodeIdentifier, "Heartbeat indicates node is running as primary node. Revoking primary role because primary role is assigned to a different node.");
revokePrimaryRole(resolvedNodeIdentifier);
}
final boolean heartbeatIndicatesNotYetConnected = !mostRecentHeartbeat.isConnected();
if (isBlockedByFirewall(resolvedNodeIdentifier.getSocketAddress())) {
if (node == null) {
logger.info("Firewall blocked heartbeat received from unknown node " + resolvedNodeIdentifier + ". Issuing disconnection request.");
} else {
// record event
addEvent(resolvedNodeIdentifier, "Firewall blocked received heartbeat. Issuing disconnection request.");
}
// request node to disconnect
requestDisconnectionQuietly(resolvedNodeIdentifier, "Blocked By Firewall");
} else if (node == null) { // unknown node, so issue reconnect request
// create new node and add to node set
final Node newNode = new Node(resolvedNodeIdentifier, Status.DISCONNECTED);
nodes.add(newNode);
// record event
addEvent(newNode.getNodeId(), "Received heartbeat from unknown node. Issuing reconnection request.");
// record heartbeat
newNode.setHeartbeat(mostRecentHeartbeat);
requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
} else if (heartbeatIndicatesNotYetConnected) {
if (Status.CONNECTED == node.getStatus()) {
// record event
addEvent(node.getNodeId(), "Received heartbeat from node that thinks it is not yet part of the cluster, though the Manager thought it was. Marking as Disconnected and issuing reconnection request.");
// record heartbeat
node.setHeartbeat(null);
node.setStatus(Status.DISCONNECTED);
requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
}
} else if (Status.DISCONNECTED == node.getStatus()) {
// ignore heartbeats from nodes disconnected by means other than lack of heartbeat, unless it is
// the only node. We allow it if it is the only node because if we have a one-node cluster, then
// we cannot manually reconnect it.
if (node.isHeartbeatDisconnection() || nodes.size() == 1) {
// record event
if (node.isHeartbeatDisconnection()) {
addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected due to lack of heartbeat. Issuing reconnection request.");
} else {
addEvent(resolvedNodeIdentifier, "Received heartbeat from node previously disconnected, but it is the only known node, so issuing reconnection request.");
}
// record heartbeat
node.setHeartbeat(mostRecentHeartbeat);
// request reconnection
requestReconnection(resolvedNodeIdentifier.getId(), "NCM Heartbeat Processing");
} else {
// disconnected nodes should not heartbeat, so we need to issue a disconnection request
heartbeatLogger.info("Ignoring received heartbeat from disconnected node " + resolvedNodeIdentifier + ". Issuing disconnection request.");
// request node to disconnect
requestDisconnectionQuietly(resolvedNodeIdentifier, "Received Heartbeat from Node, but Manager has already marked Node as Disconnected");
}
} else if (Status.DISCONNECTING == node.getStatus()) {
/* ignore spurious heartbeat */
} else { // node is either either connected or connecting
// first heartbeat causes status change from connecting to connected
if (Status.CONNECTING == node.getStatus()) {
if (mostRecentHeartbeat.getCreatedTimestamp() < node.getConnectionRequestedTimestamp()) {
heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + " but ignoring because it was generated before the node was last asked to reconnect.");
continue;
}
// set status to connected
node.setStatus(Status.CONNECTED);
// record event
addEvent(resolvedNodeIdentifier, "Received first heartbeat from connecting node. Setting node to connected.");
// notify service of updated node set
notifyDataFlowManagementServiceOfNodeStatusChange();
addBulletin(node, Severity.INFO, "Node Connected");
} else {
heartbeatLogger.info("Received heartbeat for node " + resolvedNodeIdentifier + ".");
}
// record heartbeat
node.setHeartbeat(mostRecentHeartbeat);
ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
statusRepository = createComponentStatusRepository();
componentMetricsRepositoryMap.put(node.getNodeId(), statusRepository);
}
// If it's been a while since we've captured, capture this metric.
final Date lastCaptureDate = statusRepository.getLastCaptureDate();
final long millisSinceLastCapture = (lastCaptureDate == null) ? Long.MAX_VALUE : (System.currentTimeMillis() - lastCaptureDate.getTime());
if (millisSinceLastCapture > componentStatusSnapshotMillis) {
statusRepository.capture(node.getHeartbeatPayload().getProcessGroupStatus());
}
}
} catch (final Exception e) {
logger.error("Failed to process heartbeat from {}:{} due to {}", mostRecentHeartbeat.getNodeIdentifier().getApiAddress(), mostRecentHeartbeat.getNodeIdentifier().getApiPort(), e.toString());
if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
logNodes("After Heartbeat Processing", heartbeatLogger);
} finally {
writeLock.unlock("processPendingHeartbeats");
}
}
private ComponentStatusRepository createComponentStatusRepository() {
final String implementationClassName = properties.getProperty(NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION, DEFAULT_COMPONENT_STATUS_REPO_IMPLEMENTATION);
if (implementationClassName == null) {
throw new RuntimeException("Cannot create Component Status Repository because the NiFi Properties is missing the following property: "
+ NiFiProperties.COMPONENT_STATUS_REPOSITORY_IMPLEMENTATION);
}
try {
return NarThreadContextClassLoader.createInstance(implementationClassName, ComponentStatusRepository.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
@Override
public Set<Node> getNodes(final Status... statuses) {
final Set<Status> desiredStatusSet = new HashSet<>();
for (final Status status : statuses) {
desiredStatusSet.add(status);
}
readLock.lock();
try {
final Set<Node> clonedNodes = new HashSet<>();
for (final Node node : nodes) {
if (desiredStatusSet.isEmpty() || desiredStatusSet.contains(node.getStatus())) {
clonedNodes.add(node.clone());
}
}
return Collections.unmodifiableSet(clonedNodes);
} finally {
readLock.unlock("getNodes(Status...)");
}
}
@Override
public Node getNode(final String nodeId) {
readLock.lock();
try {
for (final Node node : nodes) {
if (node.getNodeId().getId().equals(nodeId)) {
return node.clone();
}
}
return null;
} finally {
readLock.unlock("getNode(String)");
}
}
@Override
public Node getPrimaryNode() {
readLock.lock();
try {
if (primaryNodeId == null) {
return null;
} else {
return getNode(primaryNodeId.getId());
}
} finally {
readLock.unlock("getPrimaryNode");
}
}
@Override
public void deleteNode(final String nodeId, final String userDn) throws UnknownNodeException, IllegalNodeDeletionException {
writeLock.lock();
try {
final Node node = getNode(nodeId);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Status.DISCONNECTED == node.getStatus()) {
nodes.remove(node);
if (eventManager != null) {
eventManager.clearEventHistory(node.getNodeId().getId());
}
logger.info("Removing node {} from cluster because this action was requested by {}", node, userDn);
} else {
throw new IllegalNodeDeletionException("Node may not be deleted because it is not disconnected.");
}
} finally {
writeLock.unlock("deleteNode");
}
}
@Override
public Set<NodeIdentifier> getNodeIds(final Status... statuses) {
readLock.lock();
try {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
for (final Node node : nodes) {
if (statuses == null || statuses.length == 0) {
nodeIds.add(node.getNodeId());
} else {
for (final Node.Status status : statuses) {
if (node.getStatus() == status) {
nodeIds.add(node.getNodeId());
break;
}
}
}
}
return nodeIds;
} finally {
readLock.unlock("getNodeIds(Status...)");
}
}
@Override
public void setPrimaryNode(final String nodeId, final String userDn) throws UnknownNodeException, IneligiblePrimaryNodeException, PrimaryRoleAssignmentException {
writeLock.lock();
try {
final Node node = getNode(nodeId);
if (node == null) {
throw new UnknownNodeException("Node does not exist.");
} else if (Status.CONNECTED != node.getStatus()) {
throw new IneligiblePrimaryNodeException("Node must be connected before it can be assigned as the primary node.");
}
// revoke primary role
final Node primaryNode;
if ((primaryNode = getPrimaryNode()) != null) {
if (primaryNode.getStatus() == Status.DISCONNECTED) {
throw new PrimaryRoleAssignmentException("A disconnected, primary node exists. Delete the node before assigning the primary role to a different node.");
} else if (revokePrimaryRole(primaryNode.getNodeId())) {
addEvent(primaryNode.getNodeId(), "Role revoked from this node as part of primary role reassignment.");
} else {
throw new PrimaryRoleAssignmentException(
"Failed to revoke primary role from node. Primary node is now disconnected. Delete the node before assigning the primary role to a different node.");
}
}
// change the primary node ID to the given node
setPrimaryNodeId(node.getNodeId());
// assign primary role
if (assignPrimaryRole(node.getNodeId())) {
addEvent(node.getNodeId(), "Role assigned to this node as part of primary role reassignment. Action performed by " + userDn);
addBulletin(node, Severity.INFO, "Primary Role assigned to node by " + userDn);
} else {
throw new PrimaryRoleAssignmentException(
"Cluster manager assigned primary role to node, but the node failed to accept the assignment. Cluster manager disconnected node.");
}
} finally {
writeLock.unlock("setPrimaryNode");
}
}
private int getClusterProtocolHeartbeatSeconds() {
return (int) FormatUtils.getTimeDuration(properties.getClusterProtocolHeartbeatInterval(), TimeUnit.SECONDS);
}
@Override
public int getHeartbeatMonitoringIntervalSeconds() {
return 4 * getClusterProtocolHeartbeatSeconds();
}
@Override
public int getMaxHeartbeatGapSeconds() {
return 8 * getClusterProtocolHeartbeatSeconds();
}
@Override
public List<Event> getNodeEvents(final String nodeId) {
readLock.lock();
try {
List<Event> events = null;
final EventManager eventMgr = eventManager;
if (eventMgr != null) {
events = eventMgr.getEvents(nodeId);
}
if (events == null) {
return Collections.emptyList();
} else {
return Collections.unmodifiableList(events);
}
} finally {
readLock.unlock("getNodeEvents");
}
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
return applyRequest(method, uri, parameters, headers, getNodeIds(Status.CONNECTED));
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
final boolean mutableRequest = canChangeNodeState(method, uri);
final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
lock.lock();
try {
// check that the request can be applied
if (mutableRequest) {
if (isInSafeMode()) {
throw new SafeModeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while in safe mode");
} else if (!getNodeIds(Status.DISCONNECTED, Status.DISCONNECTING).isEmpty()) {
throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is disconnected from the cluster");
} else if (!getNodeIds(Status.CONNECTING).isEmpty()) {
// if any node is connecting and a request can change the flow, then we throw an exception
throw new ConnectingNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is trying to connect to the cluster");
}
}
final NodeResponse clientResponse = federateRequest(method, uri, parameters, null, headers, nodeIdentifiers);
if (clientResponse == null) {
if (mutableRequest) {
throw new NoConnectedNodesException(String.format("All nodes were disconnected as a result of applying request %s %s", method, uri));
} else {
throw new NoResponseFromNodesException("No nodes were able to process this request.");
}
} else {
return clientResponse;
}
} finally {
lock.unlock("applyRequest(String, URI, Map<String, List<String>>, Map<String, String>, Set<NodeIdentifier>");
}
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Object entity, final Map<String, String> headers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
return applyRequest(method, uri, entity, headers, getNodeIds(Status.CONNECTED));
}
@Override
public NodeResponse applyRequest(final String method, final URI uri, final Object entity, final Map<String, String> headers, final Set<NodeIdentifier> nodeIdentifiers)
throws NoConnectedNodesException, NoResponseFromNodesException, UriConstructionException, ConnectingNodeMutableRequestException, DisconnectedNodeMutableRequestException, SafeModeMutableRequestException {
final boolean mutableRequest = canChangeNodeState(method, uri);
final ClusterManagerLock lock = mutableRequest ? writeLock : readLock;
lock.lock();
try {
// check that the request can be applied
if (mutableRequest) {
if (isInSafeMode()) {
throw new SafeModeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while in safe mode");
} else if (!getNodeIds(Status.DISCONNECTED, Status.DISCONNECTING).isEmpty()) {
throw new DisconnectedNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is disconnected from the cluster");
} else if (!getNodeIds(Status.CONNECTING).isEmpty()) {
// if any node is connecting and a request can change the flow, then we throw an exception
throw new ConnectingNodeMutableRequestException("Received a mutable request [" + method + " -- " + uri + "] while a node is trying to connect to the cluster");
}
}
final NodeResponse clientResponse = federateRequest(method, uri, null, entity, headers, nodeIdentifiers);
if (clientResponse == null) {
if (mutableRequest) {
throw new NoConnectedNodesException(String.format("All nodes were disconnected as a result of applying request %s %s", method, uri));
} else {
throw new NoResponseFromNodesException("No nodes were able to process this request.");
}
} else {
return clientResponse;
}
} finally {
lock.unlock("applyRequest(String, URI, Object, Map<String, String>, Set<NodeIdentifier>");
}
}
public void setServicesBroadcaster(final ClusterServicesBroadcaster servicesBroadcaster) {
writeLock.lock();
try {
this.servicesBroadcaster = servicesBroadcaster;
} finally {
writeLock.unlock("setServicesBroadcaster");
}
}
public boolean addBroadcastedService(final DiscoverableService service) {
writeLock.lock();
try {
final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
if (broadcaster == null) {
throw new IllegalStateException("Service broadcasting is not configured.");
}
return broadcaster.addService(service);
} finally {
writeLock.unlock("addBroadcastedService");
}
}
public boolean removeBroadcastedService(final String serviceName) {
writeLock.lock();
try {
final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
if (broadcaster == null) {
throw new IllegalStateException("Service broadcasting is not configured.");
}
return broadcaster.removeService(serviceName);
} finally {
writeLock.unlock("removeBroadcastedService");
}
}
public boolean isBroadcastingConfigured() {
readLock.lock();
try {
return servicesBroadcaster != null;
} finally {
readLock.unlock("isBroadcastingConfigured");
}
}
public boolean isBroadcasting() {
readLock.lock();
try {
final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
return (broadcaster != null && broadcaster.isRunning());
} finally {
readLock.unlock("isBroadcasting");
}
}
public void addEvent(final NodeIdentifier nodeId, String eventMsg) {
writeLock.lock();
try {
final Event event = new Event(nodeId.getId(), eventMsg);
final EventManager eventMgr = eventManager;
if (eventMgr != null) {
eventMgr.addEvent(event);
}
logger.info(String.format("Node Event: %s -- '%s'", nodeId, eventMsg));
} finally {
writeLock.unlock("addEvent");
}
}
public void setEventManager(final EventManager eventManager) {
writeLock.lock();
try {
this.eventManager = eventManager;
} finally {
writeLock.unlock("setEventManager");
}
}
public void setClusterFirewall(final ClusterNodeFirewall clusterFirewall) {
writeLock.lock();
try {
this.clusterFirewall = clusterFirewall;
} finally {
writeLock.unlock("setClusterFirewall");
}
}
public boolean isFirewallConfigured() {
readLock.lock();
try {
return clusterFirewall != null;
} finally {
readLock.unlock("isFirewallConfigured");
}
}
public void setAuditService(final AuditService auditService) {
writeLock.lock();
try {
this.auditService = auditService;
} finally {
writeLock.unlock("setAuditService");
}
}
public boolean isAuditingConfigured() {
readLock.lock();
try {
return auditService != null;
} finally {
readLock.unlock("isAuditingConfigured");
}
}
private boolean isPrimaryNode(final NodeIdentifier nodeId) {
readLock.lock();
try {
return primaryNodeId != null && primaryNodeId.equals(nodeId);
} finally {
readLock.unlock("isPrimaryNode");
}
}
private boolean isInSafeMode() {
readLock.lock();
try {
return primaryNodeId == null || getRawNode(primaryNodeId.getId()) == null;
} finally {
readLock.unlock("isInSafeMode");
}
}
private void setPrimaryNodeId(final NodeIdentifier primaryNodeId) throws DaoException {
writeLock.lock();
try {
dataFlowManagementService.updatePrimaryNode(primaryNodeId);
// update the cached copy reference to minimize loading file from disk
this.primaryNodeId = primaryNodeId;
} finally {
writeLock.unlock("setPrimaryNodeId");
}
}
// requires write lock to already be acquired unless method cannot change node state
private NodeResponse federateRequest(final String method, final URI uri, final Map<String, List<String>> parameters, final Object entity, final Map<String, String> headers, final Set<NodeIdentifier> nodeIds) throws UriConstructionException {
// ensure some nodes are connected
if (nodeIds.isEmpty()) {
throw new NoConnectedNodesException("Cannot apply " + method + " request to " + uri + " because there are currently no connected Nodes");
}
logger.debug("Applying prototype request " + uri + " to nodes.");
// the starting state of the flow (current, stale, unknown)
final PersistedFlowState originalPersistedFlowState = dataFlowManagementService.getPersistedFlowState();
// check if this request can change the flow
final boolean mutableRequest = canChangeNodeState(method, uri);
final ObjectHolder<NodeResponse> holder = new ObjectHolder<>(null);
final UpdateRevision federateRequest = new UpdateRevision() {
@Override
public Revision execute(Revision currentRevision) {
// update headers to contain cluster contextual information to send to the node
final Map<String, String> updatedHeaders = new HashMap<>(headers);
final ClusterContext clusterCtx = new ClusterContextImpl();
clusterCtx.setRequestSentByClusterManager(true); // indicate request is sent from cluster manager
clusterCtx.setRevision(currentRevision);
// serialize cluster context and add to request header
final String serializedClusterCtx = WebUtils.serializeObjectToHex(clusterCtx);
updatedHeaders.put(CLUSTER_CONTEXT_HTTP_HEADER, serializedClusterCtx);
// if the request is mutable, we need to verify that it is a valid request for all nodes in the cluster.
if (mutableRequest) {
updatedHeaders.put(NCM_EXPECTS_HTTP_HEADER, "150-NodeContinue");
final Set<NodeResponse> nodeResponses;
if (entity == null) {
nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
} else {
nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
}
updatedHeaders.remove(NCM_EXPECTS_HTTP_HEADER);
for (final NodeResponse response : nodeResponses) {
if (response.getStatus() != NODE_CONTINUE_STATUS_CODE) {
final String nodeDescription = response.getNodeId().getApiAddress() + ":" + response.getNodeId().getApiPort();
final ClientResponse clientResponse = response.getClientResponse();
if (clientResponse == null) {
throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: Unexpected Response Code " + response.getStatus());
}
final String nodeExplanation = clientResponse.getEntity(String.class);
throw new IllegalClusterStateException("Node " + nodeDescription + " is unable to fulfill this request due to: " + nodeExplanation, response.getThrowable());
}
}
// set flow state to unknown to denote a mutable request replication in progress
logger.debug("Setting Flow State to UNKNOWN due to mutable request to {} {}", method, uri);
notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.UNKNOWN);
}
// replicate request
final Set<NodeResponse> nodeResponses;
try {
if (entity == null) {
nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, parameters, updatedHeaders);
} else {
nodeResponses = httpRequestReplicator.replicate(nodeIds, method, uri, entity, updatedHeaders);
}
} catch (final UriConstructionException uce) {
// request was not replicated, so mark the flow with its original state
if (mutableRequest) {
notifyDataFlowManagmentServiceOfFlowStateChange(originalPersistedFlowState);
}
throw uce;
}
// merge the response
final NodeResponse clientResponse = mergeResponses(uri, method, nodeResponses, mutableRequest);
holder.set(clientResponse);
// if we have a response get the updated cluster context for auditing and revision updating
Revision updatedRevision = null;
if (mutableRequest && clientResponse != null) {
try {
// get the cluster context from the response header
final String serializedClusterContext = clientResponse.getClientResponse().getHeaders().getFirst(CLUSTER_CONTEXT_HTTP_HEADER);
if (StringUtils.isNotBlank(serializedClusterContext)) {
// deserialize object
final Serializable clusterContextObj = WebUtils.deserializeHexToObject(serializedClusterContext);
// if we have a valid object, audit the actions
if (clusterContextObj instanceof ClusterContext) {
final ClusterContext clusterContext = (ClusterContext) clusterContextObj;
if (auditService != null) {
try {
auditService.addActions(clusterContext.getActions());
} catch (Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
}
}
}
updatedRevision = clusterContext.getRevision();
}
}
} catch (final ClassNotFoundException cnfe) {
logger.warn("Classpath issue detected because failed to deserialize cluster context from node response due to: " + cnfe, cnfe);
}
}
return updatedRevision;
}
};
// federate the request and lock on the revision
if (mutableRequest) {
optimisticLockingManager.setRevision(federateRequest);
} else {
federateRequest.execute(optimisticLockingManager.getLastModification().getRevision());
}
return holder.get();
}
private static boolean isProcessorsEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isProcessorEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESSOR_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && PROCESSORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
private static boolean isProcessGroupEndpoint(final URI uri, final String method) {
return ("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isTemplateEndpoint(final URI uri, final String method) {
return "POST".equalsIgnoreCase(method) && TEMPLATE_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isFlowSnippetEndpoint(final URI uri, final String method) {
return "POST".equalsIgnoreCase(method) && FLOW_SNIPPET_INSTANCE_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isRemoteProcessGroupsEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches();
}
private static boolean isRemoteProcessGroupEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REMOTE_PROCESS_GROUP_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && REMOTE_PROCESS_GROUPS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
private static boolean isProvenanceQueryEndpoint(final URI uri, final String method) {
if ("POST".equalsIgnoreCase(method) && PROVENANCE_URI.equals(uri.getPath())) {
return true;
} else if ("GET".equalsIgnoreCase(method) && PROVENANCE_QUERY_URI.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
private static boolean isProvenanceEventEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && PROVENANCE_EVENT_URI.matcher(uri.getPath()).matches();
}
private static boolean isControllerServicesEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath());
}
private static boolean isControllerServiceEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && CONTROLLER_SERVICES_URI.equals(uri.getPath())) {
return true;
}
return false;
}
private static boolean isControllerServiceReferenceEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && CONTROLLER_SERVICE_REFERENCES_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
return false;
}
private static boolean isReportingTasksEndpoint(final URI uri, final String method) {
return "GET".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath());
}
private static boolean isReportingTaskEndpoint(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && REPORTING_TASK_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && REPORTING_TASKS_URI.equals(uri.getPath())) {
return true;
}
return false;
}
static boolean isResponseInterpreted(final URI uri, final String method) {
return isProcessorsEndpoint(uri, method) || isProcessorEndpoint(uri, method)
|| isRemoteProcessGroupsEndpoint(uri, method) || isRemoteProcessGroupEndpoint(uri, method)
|| isProcessGroupEndpoint(uri, method)
|| isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method)
|| isProvenanceQueryEndpoint(uri, method) || isProvenanceEventEndpoint(uri, method)
|| isControllerServicesEndpoint(uri, method) || isControllerServiceEndpoint(uri, method) || isControllerServiceReferenceEndpoint(uri, method)
|| isReportingTasksEndpoint(uri, method) || isReportingTaskEndpoint(uri, method);
}
private void mergeProcessorValidationErrors(final ProcessorDTO processor, Map<NodeIdentifier, ProcessorDTO> processorMap) {
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
for (final Map.Entry<NodeIdentifier, ProcessorDTO> nodeEntry : processorMap.entrySet()) {
final NodeIdentifier nodeId = nodeEntry.getKey();
final ProcessorDTO nodeProcessor = nodeEntry.getValue();
// merge the validation errors
mergeValidationErrors(validationErrorMap, nodeId, nodeProcessor.getValidationErrors());
}
// set the merged the validation errors
processor.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, processorMap.size()));
}
private void mergeProvenanceQueryResults(final ProvenanceDTO provenanceDto, final Map<NodeIdentifier, ProvenanceDTO> resultMap, final Set<NodeResponse> problematicResponses) {
final ProvenanceResultsDTO results = provenanceDto.getResults();
final ProvenanceRequestDTO request = provenanceDto.getRequest();
final List<ProvenanceEventDTO> allResults = new ArrayList<>(1024);
final Set<String> errors = new HashSet<>();
Date oldestEventDate = new Date();
int percentageComplete = 0;
boolean finished = true;
long totalRecords = 0;
for (final Map.Entry<NodeIdentifier, ProvenanceDTO> entry : resultMap.entrySet()) {
final NodeIdentifier nodeIdentifier = entry.getKey();
final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
final ProvenanceDTO nodeDto = entry.getValue();
final ProvenanceResultsDTO nodeResultDto = nodeDto.getResults();
if (nodeResultDto != null && nodeResultDto.getProvenanceEvents() != null) {
// increment the total number of records
totalRecords += nodeResultDto.getTotalCount();
// populate the cluster identifier
for (final ProvenanceEventDTO eventDto : nodeResultDto.getProvenanceEvents()) {
eventDto.setClusterNodeId(nodeIdentifier.getId());
eventDto.setClusterNodeAddress(nodeAddress);
// add node identifier to the event's id so that it is unique across cluster
eventDto.setId(nodeIdentifier.getId() + eventDto.getId());
allResults.add(eventDto);
}
}
if (nodeResultDto.getOldestEvent() != null && nodeResultDto.getOldestEvent().before(oldestEventDate)) {
oldestEventDate = nodeResultDto.getOldestEvent();
}
if (nodeResultDto.getErrors() != null) {
for (final String error : nodeResultDto.getErrors()) {
errors.add(nodeAddress + " -- " + error);
}
}
percentageComplete += nodeDto.getPercentCompleted();
if (!nodeDto.isFinished()) {
finished = false;
}
}
percentageComplete /= resultMap.size();
// consider any problematic responses as errors
for (final NodeResponse problematicResponse : problematicResponses) {
final NodeIdentifier problemNode = problematicResponse.getNodeId();
final String problemNodeAddress = problemNode.getApiAddress() + ":" + problemNode.getApiPort();
errors.add(String.format("%s -- Request did not complete successfully (Status code: %s)", problemNodeAddress, problematicResponse.getStatus()));
}
// Since we get back up to the maximum number of results from each node, we need to sort those values and then
// grab only the first X number of them. We do a sort based on time, such that the newest are included.
// If 2 events have the same timestamp, we do a secondary sort based on Cluster Node Identifier. If those are
// equal, we perform a terciary sort based on the the event id
Collections.sort(allResults, new Comparator<ProvenanceEventDTO>() {
@Override
public int compare(final ProvenanceEventDTO o1, final ProvenanceEventDTO o2) {
final int eventTimeComparison = o1.getEventTime().compareTo(o2.getEventTime());
if (eventTimeComparison != 0) {
return -eventTimeComparison;
}
final String nodeId1 = o1.getClusterNodeId();
final String nodeId2 = o2.getClusterNodeId();
final int nodeIdComparison;
if (nodeId1 == null && nodeId2 == null) {
nodeIdComparison = 0;
} else if (nodeId1 == null) {
nodeIdComparison = 1;
} else if (nodeId2 == null) {
nodeIdComparison = -1;
} else {
nodeIdComparison = -nodeId1.compareTo(nodeId2);
}
if (nodeIdComparison != 0) {
return nodeIdComparison;
}
return -Long.compare(o1.getEventId(), o2.getEventId());
}
});
final int maxResults = request.getMaxResults().intValue();
final List<ProvenanceEventDTO> selectedResults;
if (allResults.size() < maxResults) {
selectedResults = allResults;
} else {
selectedResults = allResults.subList(0, maxResults);
}
// include any errors
if (errors.size() > 0) {
results.setErrors(errors);
}
results.setTotalCount(totalRecords);
results.setTotal(FormatUtils.formatCount(totalRecords));
results.setProvenanceEvents(selectedResults);
results.setOldestEvent(oldestEventDate);
results.setGenerated(new Date());
provenanceDto.setPercentCompleted(percentageComplete);
provenanceDto.setFinished(finished);
}
private void mergeRemoteProcessGroup(final RemoteProcessGroupDTO remoteProcessGroup, final Map<NodeIdentifier, RemoteProcessGroupDTO> remoteProcessGroupMap) {
final RemoteProcessGroupContentsDTO remoteProcessGroupContents = remoteProcessGroup.getContents();
Boolean mergedIsTargetSecure = null;
final List<String> mergedAuthorizationIssues = new ArrayList<>();
final Set<RemoteProcessGroupPortDTO> mergedInputPorts = new HashSet<>();
final Set<RemoteProcessGroupPortDTO> mergedOutputPorts = new HashSet<>();
for (final Map.Entry<NodeIdentifier, RemoteProcessGroupDTO> nodeEntry : remoteProcessGroupMap.entrySet()) {
final NodeIdentifier nodeId = nodeEntry.getKey();
final RemoteProcessGroupDTO nodeRemoteProcessGroupDto = nodeEntry.getValue();
// merge the issues
final List<String> nodeAuthorizationIssues = nodeRemoteProcessGroupDto.getAuthorizationIssues();
if (nodeAuthorizationIssues != null && !nodeAuthorizationIssues.isEmpty()) {
for (final String nodeAuthorizationIssue : nodeAuthorizationIssues) {
mergedAuthorizationIssues.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + nodeAuthorizationIssue);
}
}
// use the first target secure flag since they will all be the same
final Boolean nodeIsTargetSecure = nodeRemoteProcessGroupDto.isTargetSecure();
if (mergedIsTargetSecure == null) {
mergedIsTargetSecure = nodeIsTargetSecure;
}
// merge the ports in the contents
final RemoteProcessGroupContentsDTO nodeRemoteProcessGroupContentsDto = nodeRemoteProcessGroupDto.getContents();
if (remoteProcessGroupContents != null && nodeRemoteProcessGroupContentsDto != null) {
if (nodeRemoteProcessGroupContentsDto.getInputPorts() != null) {
mergedInputPorts.addAll(nodeRemoteProcessGroupContentsDto.getInputPorts());
}
if (nodeRemoteProcessGroupContentsDto.getOutputPorts() != null) {
mergedOutputPorts.addAll(nodeRemoteProcessGroupContentsDto.getOutputPorts());
}
}
}
if (remoteProcessGroupContents != null) {
if (!mergedInputPorts.isEmpty()) {
remoteProcessGroupContents.setInputPorts(mergedInputPorts);
}
if (!mergedOutputPorts.isEmpty()) {
remoteProcessGroupContents.setOutputPorts(mergedOutputPorts);
}
}
if (mergedIsTargetSecure != null) {
remoteProcessGroup.setTargetSecure(mergedIsTargetSecure);
}
if (!mergedAuthorizationIssues.isEmpty()) {
remoteProcessGroup.setAuthorizationIssues(mergedAuthorizationIssues);
}
}
private void mergeControllerServiceReferences(final Set<ControllerServiceReferencingComponentDTO> referencingComponents, final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> referencingComponentMap) {
final Map<String, Integer> activeThreadCounts = new HashMap<>();
final Map<String, String> states = new HashMap<>();
for (final Map.Entry<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeEntry : referencingComponentMap.entrySet()) {
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeEntry.getValue();
// go through all the nodes referencing components
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponent : nodeReferencingComponents) {
// handle active thread counts
if (nodeReferencingComponent.getActiveThreadCount() != null && nodeReferencingComponent.getActiveThreadCount() > 0) {
final Integer current = activeThreadCounts.get(nodeReferencingComponent.getId());
if (current == null) {
activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount());
} else {
activeThreadCounts.put(nodeReferencingComponent.getId(), nodeReferencingComponent.getActiveThreadCount() + current);
}
}
// handle controller service state
final String state = states.get(nodeReferencingComponent.getId());
if (state == null) {
if (ControllerServiceState.DISABLING.name().equals(nodeReferencingComponent.getState())) {
states.put(nodeReferencingComponent.getId(), ControllerServiceState.DISABLING.name());
} else if (ControllerServiceState.ENABLING.name().equals(nodeReferencingComponent.getState())) {
states.put(nodeReferencingComponent.getId(), ControllerServiceState.ENABLING.name());
}
}
}
}
// go through each referencing components
for (final ControllerServiceReferencingComponentDTO referencingComponent : referencingComponents) {
final Integer activeThreadCount = activeThreadCounts.get(referencingComponent.getId());
if (activeThreadCount != null) {
referencingComponent.setActiveThreadCount(activeThreadCount);
}
final String state = states.get(referencingComponent.getId());
if (state != null) {
referencingComponent.setState(state);
}
}
}
private void mergeControllerService(final ControllerServiceDTO controllerService, final Map<NodeIdentifier, ControllerServiceDTO> controllerServiceMap) {
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = controllerService.getReferencingComponents();
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> nodeReferencingComponentsMap = new HashMap<>();
String state = null;
for (final Map.Entry<NodeIdentifier, ControllerServiceDTO> nodeEntry : controllerServiceMap.entrySet()) {
final NodeIdentifier nodeId = nodeEntry.getKey();
final ControllerServiceDTO nodeControllerService = nodeEntry.getValue();
if (state == null) {
if (ControllerServiceState.DISABLING.name().equals(nodeControllerService.getState())) {
state = ControllerServiceState.DISABLING.name();
} else if (ControllerServiceState.ENABLING.name().equals(nodeControllerService.getState())) {
state = ControllerServiceState.ENABLING.name();
}
}
for (final ControllerServiceReferencingComponentDTO nodeReferencingComponents : nodeControllerService.getReferencingComponents()) {
nodeReferencingComponentsMap.put(nodeId, nodeReferencingComponents.getReferencingComponents());
}
// merge the validation errors
mergeValidationErrors(validationErrorMap, nodeId, nodeControllerService.getValidationErrors());
}
// merge the referencing components
mergeControllerServiceReferences(referencingComponents, nodeReferencingComponentsMap);
// store the 'transition' state is applicable
if (state != null) {
controllerService.setState(state);
}
// set the merged the validation errors
controllerService.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, controllerServiceMap.size()));
}
private void mergeReportingTask(final ReportingTaskDTO reportingTask, final Map<NodeIdentifier, ReportingTaskDTO> reportingTaskMap) {
final Map<String, Set<NodeIdentifier>> validationErrorMap = new HashMap<>();
int activeThreadCount = 0;
for (final Map.Entry<NodeIdentifier, ReportingTaskDTO> nodeEntry : reportingTaskMap.entrySet()) {
final NodeIdentifier nodeId = nodeEntry.getKey();
final ReportingTaskDTO nodeReportingTask = nodeEntry.getValue();
if (nodeReportingTask.getActiveThreadCount() != null) {
activeThreadCount += nodeReportingTask.getActiveThreadCount();
}
// merge the validation errors
mergeValidationErrors(validationErrorMap, nodeId, nodeReportingTask.getValidationErrors());
}
// set the merged active thread counts
reportingTask.setActiveThreadCount(activeThreadCount);
// set the merged the validation errors
reportingTask.setValidationErrors(normalizedMergedValidationErrors(validationErrorMap, reportingTaskMap.size()));
}
/**
* Merges the validation errors into the specified map, recording the corresponding node identifier.
*
* @param validationErrorMap
* @param nodeId
* @param nodeValidationErrors
*/
public void mergeValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, final NodeIdentifier nodeId, final Collection<String> nodeValidationErrors) {
if (nodeValidationErrors != null) {
for (final String nodeValidationError : nodeValidationErrors) {
Set<NodeIdentifier> nodeSet = validationErrorMap.get(nodeValidationError);
if (nodeSet == null) {
nodeSet = new HashSet<>();
validationErrorMap.put(nodeValidationError, nodeSet);
}
nodeSet.add(nodeId);
}
}
}
/**
* Normalizes the validation errors by prepending the corresponding nodes when the error does not exist across all nodes.
*
* @param validationErrorMap
* @param totalNodes
* @return
*/
public Set<String> normalizedMergedValidationErrors(final Map<String, Set<NodeIdentifier>> validationErrorMap, int totalNodes) {
final Set<String> normalizedValidationErrors = new HashSet<>();
for (final Map.Entry<String, Set<NodeIdentifier>> validationEntry : validationErrorMap.entrySet()) {
final String msg = validationEntry.getKey();
final Set<NodeIdentifier> nodeIds = validationEntry.getValue();
if (nodeIds.size() == totalNodes) {
normalizedValidationErrors.add(msg);
} else {
for (final NodeIdentifier nodeId : nodeIds) {
normalizedValidationErrors.add(nodeId.getApiAddress() + ":" + nodeId.getApiPort() + " -- " + msg);
}
}
}
return normalizedValidationErrors;
}
// requires write lock to be already acquired unless request is not mutable
private NodeResponse mergeResponses(final URI uri, final String method, final Set<NodeResponse> nodeResponses, final boolean mutableRequest) {
// holds the one response of all the node responses to return to the client
NodeResponse clientResponse = null;
// holds the set of node responses that did not result in a 2XX response
final Set<NodeResponse> problematicNodeResponses = new HashSet<>();
// map updated node to its response
final Map<Node, NodeResponse> updatedNodesMap = new HashMap<>();
for (final Map.Entry<NodeResponse, Status> entry : httpResponseMapper.map(uri, nodeResponses).entrySet()) {
final NodeResponse nodeResponse = entry.getKey();
final Status nodeStatus = entry.getValue();
// create new "updated" node by cloning old node and updating status
final Node currentNode = getRawNode(nodeResponse.getNodeId().getId());
final Node updatedNode = currentNode.clone();
updatedNode.setStatus(nodeStatus);
// map updated node to its response
updatedNodesMap.put(updatedNode, nodeResponse);
// record a client request and any requests that resulted in disconnection
if (nodeStatus == Status.CONNECTED) {
clientResponse = nodeResponse;
} else if (nodeStatus == Status.DISCONNECTED) {
problematicNodeResponses.add(nodeResponse);
}
}
// determine if we have at least one response
final boolean hasClientResponse = clientResponse != null;
final boolean hasSuccessfulClientResponse = hasClientResponse && clientResponse.is2xx();
// drain the responses from the socket for those responses not being sent to the client
final Set<NodeResponse> nodeResponsesToDrain = new HashSet<>(updatedNodesMap.values());
nodeResponsesToDrain.remove(clientResponse);
if (hasSuccessfulClientResponse && isProcessorEndpoint(uri, method)) {
final ProcessorEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorEntity.class);
final ProcessorDTO processor = responseEntity.getProcessor();
final Map<NodeIdentifier, ProcessorDTO> processorMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor();
processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
mergeProcessorValidationErrors(processor, processorMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isProcessorsEndpoint(uri, method)) {
final ProcessorsEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessorsEntity.class);
final Set<ProcessorDTO> processors = responseEntity.getProcessors();
final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors();
for (final ProcessorDTO nodeProcessor : nodeProcessors) {
Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
processorMap.put(nodeProcessor.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
}
for (final ProcessorDTO processor : processors) {
final String procId = processor.getId();
final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId);
mergeProcessorValidationErrors(processor, mergeMap);
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isProcessGroupEndpoint(uri, method)) {
final ProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
final ProcessGroupDTO responseDto = responseEntity.getProcessGroup();
final FlowSnippetDTO contents = responseDto.getContents();
if (contents == null) {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
}
} else {
final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>();
final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup();
for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) {
Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
processorMap.put(nodeProcessor.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeProcessGroup.getContents().getRemoteProcessGroups()) {
Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
}
}
for (final ProcessorDTO processor : contents.getProcessors()) {
final String procId = processor.getId();
final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId);
mergeProcessorValidationErrors(processor, mergeMap);
}
for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) {
if (remoteProcessGroup.getContents() != null) {
final String remoteProcessGroupId = remoteProcessGroup.getId();
final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId);
mergeRemoteProcessGroup(remoteProcessGroup, mergeMap);
}
}
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && (isTemplateEndpoint(uri, method) || isFlowSnippetEndpoint(uri, method))) {
final FlowSnippetEntity responseEntity = clientResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO contents = responseEntity.getContents();
if (contents == null) {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
}
} else {
final Map<String, Map<NodeIdentifier, ProcessorDTO>> processorMap = new HashMap<>();
final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
Map<NodeIdentifier, ProcessorDTO> innerMap = processorMap.get(nodeProcessor.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
processorMap.put(nodeProcessor.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeContents.getRemoteProcessGroups()) {
Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
}
}
for (final ProcessorDTO processor : contents.getProcessors()) {
final String procId = processor.getId();
final Map<NodeIdentifier, ProcessorDTO> mergeMap = processorMap.get(procId);
mergeProcessorValidationErrors(processor, mergeMap);
}
for (final RemoteProcessGroupDTO remoteProcessGroup : contents.getRemoteProcessGroups()) {
if (remoteProcessGroup.getContents() != null) {
final String remoteProcessGroupId = remoteProcessGroup.getId();
final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId);
mergeRemoteProcessGroup(remoteProcessGroup, mergeMap);
}
}
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) {
final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup();
final Map<NodeIdentifier, RemoteProcessGroupDTO> remoteProcessGroupMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup();
remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
}
mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) {
final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups();
final Map<String, Map<NodeIdentifier, RemoteProcessGroupDTO>> remoteProcessGroupMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups();
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) {
Map<NodeIdentifier, RemoteProcessGroupDTO> innerMap = remoteProcessGroupMap.get(nodeRemoteProcessGroup.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
remoteProcessGroupMap.put(nodeRemoteProcessGroup.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
}
}
for (final RemoteProcessGroupDTO remoteProcessGroup : remoteProcessGroups) {
final String remoteProcessGroupId = remoteProcessGroup.getId();
final Map<NodeIdentifier, RemoteProcessGroupDTO> mergeMap = remoteProcessGroupMap.get(remoteProcessGroupId);
mergeRemoteProcessGroup(remoteProcessGroup, mergeMap);
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isProvenanceQueryEndpoint(uri, method)) {
final ProvenanceEntity responseEntity = clientResponse.getClientResponse().getEntity(ProvenanceEntity.class);
final ProvenanceDTO query = responseEntity.getProvenance();
final Map<NodeIdentifier, ProvenanceDTO> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance();
resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
}
mergeProvenanceQueryResults(query, resultsMap, problematicNodeResponses);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isProvenanceEventEndpoint(uri, method)) {
final ProvenanceEventEntity responseEntity = clientResponse.getClientResponse().getEntity(ProvenanceEventEntity.class);
final ProvenanceEventDTO event = responseEntity.getProvenanceEvent();
// this request was sent to a specific node... populate its details
final NodeIdentifier nodeId = clientResponse.getNodeId();
event.setClusterNodeId(nodeId.getId());
event.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort());
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isControllerServiceEndpoint(uri, method)) {
final ControllerServiceEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
final ControllerServiceDTO controllerService = responseEntity.getControllerService();
final Map<NodeIdentifier, ControllerServiceDTO> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
}
mergeControllerService(controllerService, resultsMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isControllerServicesEndpoint(uri, method)) {
final ControllerServicesEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
final Set<ControllerServiceDTO> controllerServices = responseEntity.getControllerServices();
final Map<String, Map<NodeIdentifier, ControllerServiceDTO>> controllerServiceMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
Map<NodeIdentifier, ControllerServiceDTO> innerMap = controllerServiceMap.get(nodeControllerService.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
controllerServiceMap.put(nodeControllerService.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeControllerService);
}
}
for (final ControllerServiceDTO controllerService : controllerServices) {
final String procId = controllerService.getId();
final Map<NodeIdentifier, ControllerServiceDTO> mergeMap = controllerServiceMap.get(procId);
mergeControllerService(controllerService, mergeMap);
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isControllerServiceReferenceEndpoint(uri, method)) {
final ControllerServiceReferencingComponentsEntity responseEntity = clientResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> referencingComponents = responseEntity.getControllerServiceReferencingComponents();
final Map<NodeIdentifier, Set<ControllerServiceReferencingComponentDTO>> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ControllerServiceReferencingComponentsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
}
mergeControllerServiceReferences(referencingComponents, resultsMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isReportingTaskEndpoint(uri, method)) {
final ReportingTaskEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
final ReportingTaskDTO reportingTask = responseEntity.getReportingTask();
final Map<NodeIdentifier, ReportingTaskDTO> resultsMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
}
mergeReportingTask(reportingTask, resultsMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else if (hasSuccessfulClientResponse && isReportingTasksEndpoint(uri, method)) {
final ReportingTasksEntity responseEntity = clientResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
final Set<ReportingTaskDTO> reportingTaskSet = responseEntity.getReportingTasks();
final Map<String, Map<NodeIdentifier, ReportingTaskDTO>> reportingTaskMap = new HashMap<>();
for (final NodeResponse nodeResponse : updatedNodesMap.values()) {
if (problematicNodeResponses.contains(nodeResponse)) {
continue;
}
final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
Map<NodeIdentifier, ReportingTaskDTO> innerMap = reportingTaskMap.get(nodeReportingTask.getId());
if (innerMap == null) {
innerMap = new HashMap<>();
reportingTaskMap.put(nodeReportingTask.getId(), innerMap);
}
innerMap.put(nodeResponse.getNodeId(), nodeReportingTask);
}
}
for (final ReportingTaskDTO reportingTask : reportingTaskSet) {
final String procId = reportingTask.getId();
final Map<NodeIdentifier, ReportingTaskDTO> mergeMap = reportingTaskMap.get(procId);
mergeReportingTask(reportingTask, mergeMap);
}
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
} else {
if (!nodeResponsesToDrain.isEmpty()) {
drainResponses(nodeResponsesToDrain);
}
}
/*
* Nodes that encountered issues handling the request are marked as
* disconnected for mutable requests (e.g., post, put, delete). For
* other requests (e.g., get, head), the nodes remain in their current
* state even if they had problems handling the request.
*/
if (mutableRequest) {
// set the updated nodes
nodes.removeAll(updatedNodesMap.keySet());
nodes.addAll(updatedNodesMap.keySet());
// notify service of updated node set
notifyDataFlowManagementServiceOfNodeStatusChange();
// mark flow as stale since this request could have changed the flow
notifyDataFlowManagmentServiceOfFlowStateChange(PersistedFlowState.STALE);
// disconnect problematic nodes
if (!problematicNodeResponses.isEmpty()) {
if (problematicNodeResponses.size() < nodeResponses.size()) {
logger.warn(String.format("One or more nodes failed to process URI '%s'. Requesting each node to disconnect from cluster.", uri));
disconnectNodes(problematicNodeResponses, "Failed to process URI " + uri);
} else {
logger.warn("All nodes failed to process URI {}. As a result, no node will be disconnected from cluster", uri);
}
}
}
return clientResponse;
}
/**
* Drains the node responses off of the socket to ensure that the socket is
* appropriately cleaned-up.
*
* @param nodeResponses the collection of node responses
*/
private void drainResponses(final Collection<NodeResponse> nodeResponses) {
// fail fast if nothing to do
if (nodeResponses.isEmpty()) {
return;
}
final ExecutorService executorService = Executors.newFixedThreadPool(properties.getClusterManagerProtocolThreads());
final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (final NodeResponse nodeResponse : nodeResponses) {
// if we received a response, then clear out the response data
if (!nodeResponse.hasThrowable()) {
completionService.submit(new Runnable() {
@Override
public void run() {
try {
((StreamingOutput) nodeResponse.getResponse().getEntity()).write(
new OutputStream() {
@Override
public void write(final int b) { /* drain response */ }
}
);
} catch (final IOException | WebApplicationException ex) {
logger.info("Failed clearing out non-client response buffer due to: " + ex, ex);
}
}
}, null);
}
}
executorService.shutdown();
}
/**
* A helper method to disconnect nodes that returned unsuccessful HTTP
* responses because of a replicated request. Disconnection requests are
* sent concurrently.
*
* @param nodeResponses
*/
private void disconnectNodes(final Set<NodeResponse> nodeResponses, final String explanation) {
// return fast if nothing to do
if (nodeResponses == null || nodeResponses.isEmpty()) {
return;
}
final ExecutorService executorService = Executors.newFixedThreadPool(properties.getClusterManagerProtocolThreads());
final CompletionService<Void> completionService = new ExecutorCompletionService<>(executorService);
for (final NodeResponse nodeResponse : nodeResponses) {
completionService.submit(new Runnable() {
@Override
public void run() {
final NodeIdentifier nodeId = nodeResponse.getNodeId();
final int responseStatus = nodeResponse.getStatus();
final URI requestUri = nodeResponse.getRequestUri();
final StringBuilder msgBuilder = new StringBuilder();
msgBuilder
.append("Requesting disconnection for node ")
.append(nodeId)
.append(" for request URI ")
.append(requestUri);
if (nodeResponse.hasThrowable()) {
msgBuilder.append(" because manager encountered exception when issuing request: ")
.append(nodeResponse.getThrowable());
// log stack trace anytime we have a throwable
((NiFiLog) logger).getWrappedLog().info(msgBuilder.toString(), nodeResponse.getThrowable());
addEvent(nodeId, "Manager encountered exception when issuing request for URI " + requestUri);
addBulletin(nodeId, Severity.ERROR, "Manager encountered exception when issuing request for URI " + requestUri + "; node will be disconnected");
} else {
msgBuilder.append(" because HTTP response status was ")
.append(responseStatus);
logger.info(msgBuilder.toString());
addEvent(nodeId, "HTTP response status was unsuccessful (" + responseStatus + ") for request URI " + requestUri);
addBulletin(nodeId, Severity.ERROR, "HTTP response status was unsuccessful (" + responseStatus + ") for request URI " + requestUri);
}
requestDisconnectionQuietly(nodeId, explanation);
}
}, null);
}
executorService.shutdown();
}
/**
* Returns false if an internal protocol message was received by a node
* listed in the firewall. If no firewall is configured, then false is
* always returned.
*
* @param ip the IP of the remote machine
*
* @return false if the IP is listed in the firewall or if the firewall is
* not configured; true otherwise
*/
private boolean isBlockedByFirewall(final String ip) {
if (isFirewallConfigured()) {
return !clusterFirewall.isPermissible(ip);
} else {
return false;
}
}
private Set<Node> getRawNodes(final Status... statuses) {
readLock.lock();
try {
final Set<Node> result = new HashSet<>();
if (statuses == null || statuses.length == 0) {
result.addAll(nodes);
} else {
for (final Node node : nodes) {
for (final Node.Status status : statuses) {
if (node.getStatus() == status) {
result.add(node);
break;
}
}
}
}
return result;
} finally {
readLock.unlock("getRawNodes(Status...)");
}
}
private Node getRawNode(final String nodeId) {
readLock.lock();
try {
for (final Node node : nodes) {
if (node.getNodeId().getId().equals(nodeId)) {
return node;
}
}
return null;
} finally {
readLock.unlock("getRawNode(String)");
}
}
/**
* Resolves a proposed node identifier to a node identifier that the manager
* approves. If the proposed node identifier conflicts with an existing node
* identifier, then an approved node identifier is generated and returned to
* the caller.
*
* @param proposedNodeId a proposed identifier
*
* @return the node identifier that should be used
*/
private NodeIdentifier resolveProposedNodeIdentifier(final NodeIdentifier proposedNodeId) {
readLock.lock();
try {
for (final Node node : nodes) {
final NodeIdentifier nodeId = node.getNodeId();
// are the ids the same
final boolean sameId = nodeId.equals(proposedNodeId);
// are the service coordinates the same
final boolean sameServiceCoordinates = nodeId.logicallyEquals(proposedNodeId);
if (sameId && sameServiceCoordinates) {
// we know about this node and it has the same ID, so the proposal is fine
return proposedNodeId;
} else if (sameId && !sameServiceCoordinates) {
// proposed ID conflicts with existing node ID, so assign a new ID
final NodeIdentifier resolvedIdentifier = new NodeIdentifier(
UUID.randomUUID().toString(),
proposedNodeId.getApiAddress(),
proposedNodeId.getApiPort(),
proposedNodeId.getSocketAddress(),
proposedNodeId.getSocketPort());
logger.info(String.format("Using Node Identifier %s because proposed node identifier %s conflicts existing node identifiers",
resolvedIdentifier, proposedNodeId));
return resolvedIdentifier;
} else if (!sameId && sameServiceCoordinates) {
// we know about this node, so we'll use the existing ID
logger.debug(String.format("Using Node Identifier %s because proposed node identifier %s matches the service coordinates",
nodeId, proposedNodeId));
return nodeId;
}
}
// proposal does not conflict with existing nodes
return proposedNodeId;
} finally {
readLock.unlock("resolveProposedNodeIdentifier");
}
}
private boolean isHeartbeatMonitorRunning() {
readLock.lock();
try {
return heartbeatMonitor != null;
} finally {
readLock.unlock("isHeartbeatMonitorRunning");
}
}
private boolean canChangeNodeState(final String method, final URI uri) {
return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method));
}
private void notifyDataFlowManagementServiceOfNodeStatusChange() {
writeLock.lock();
try {
// tell service about the currently connected nodes
logger.debug("Notifying DataFlow Management Service of current set of connected nodes.");
dataFlowManagementService.setNodeIds(getNodeIds(Status.CONNECTED));
} finally {
writeLock.unlock("notifyDataFlowManagementServiceOfNodeStatusChange");
}
}
private void notifyDataFlowManagmentServiceOfFlowStateChange(final PersistedFlowState newState) {
writeLock.lock();
try {
logger.debug("Notifying DataFlow Management Service that flow state is " + newState);
dataFlowManagementService.setPersistedFlowState(newState);
if (newState != PersistedFlowState.CURRENT) {
cachedDataFlow = null;
/* do not reset primary node ID because only the data flow has changed */
}
} finally {
writeLock.unlock("notifyDataFlowManagementServiceOfFlowStateChange");
}
}
private void logNodes(final String header, final Logger logger) {
if (logger.isTraceEnabled()) {
if (StringUtils.isNotBlank(header)) {
logger.trace(header);
}
for (final Node node : getNodes()) {
logger.trace(node.getNodeId() + " : " + node.getStatus());
}
}
}
private void executeSafeModeTask() {
new Thread(new Runnable() {
private final long threadStartTime = System.currentTimeMillis();
@Override
public void run() {
logger.info("Entering safe mode...");
final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
boolean exitSafeMode = false;
while (isRunning()) {
writeLock.lock();
try {
final long currentTime = System.currentTimeMillis();
if (timeToElect < currentTime) {
final Set<NodeIdentifier> connectedNodeIds = getNodeIds(Status.CONNECTED);
if (!connectedNodeIds.isEmpty()) {
// get first connected node ID
final NodeIdentifier connectedNodeId = connectedNodeIds.iterator().next();
if (assignPrimaryRole(connectedNodeId)) {
try {
setPrimaryNodeId(connectedNodeId);
exitSafeMode = true;
} catch (final DaoException de) {
final String message = String.format("Failed to persist primary node ID '%s' in cluster dataflow.", connectedNodeId);
logger.warn(message);
addBulletin(connectedNodeId, Severity.WARNING, message);
revokePrimaryRole(connectedNodeId);
}
}
}
}
if (!isInSafeMode()) {
// a primary node has been selected outside of this thread
exitSafeMode = true;
logger.info("Exiting safe mode because " + primaryNodeId + " has been assigned the primary role.");
break;
}
} finally {
writeLock.unlock("executeSafeModeTask");
}
if (!exitSafeMode) {
// sleep for a bit
try {
Thread.sleep(1000);
} catch (final InterruptedException ie) {
return;
}
}
}
}
}).start();
}
/**
* This timer task simply processes any pending heartbeats. This timer task
* is not strictly needed, as HeartbeatMonitoringTimerTask will do this.
* However, this task is scheduled much more frequently and by processing
* the heartbeats more frequently, the stats that we report have less of a
* delay.
*/
private class ProcessPendingHeartbeatsTask extends TimerTask {
@Override
public void run() {
writeLock.lock();
try {
processPendingHeartbeats();
} finally {
writeLock.unlock("Process Pending Heartbeats Task");
}
}
}
/**
* A timer task to detect nodes that have not sent a heartbeat in a while.
* The "problem" nodes are marked as disconnected due to lack of heartbeat
* by the task. No disconnection request is sent to the node. This is
* because either the node is not functioning in which case sending the
* request is futile or the node is running a bit slow. In the latter case,
* we'll wait for the next heartbeat and send a reconnection request when we
* process the heartbeat in the heartbeatHandler() method.
*/
private class HeartbeatMonitoringTimerTask extends TimerTask {
@Override
public void run() {
// keep track of any status changes
boolean statusChanged = false;
writeLock.lock();
try {
// process all of the heartbeats before we decided to kick anyone out of the cluster.
logger.debug("Processing pending heartbeats...");
processPendingHeartbeats();
logger.debug("Executing heartbeat monitoring");
// check for any nodes that have not heartbeated in a long time
for (final Node node : getRawNodes(Status.CONNECTED)) {
// return prematurely if we were interrupted
if (Thread.currentThread().isInterrupted()) {
return;
}
// check if we received a recent heartbeat, changing status to disconnected if necessary
final long lastHeardTimestamp = node.getHeartbeat().getCreatedTimestamp();
final int heartbeatGapSeconds = (int) (new Date().getTime() - lastHeardTimestamp) / 1000;
if (heartbeatGapSeconds > getMaxHeartbeatGapSeconds()) {
node.setHeartbeatDisconnection();
addEvent(node.getNodeId(), "Node disconnected due to lack of heartbeat.");
addBulletin(node, Severity.WARNING, "Node disconnected due to lack of heartbeat");
statusChanged = true;
}
}
// if a status change occurred, make the necessary updates
if (statusChanged) {
logNodes("Heartbeat Monitoring disconnected node(s)", logger);
// notify service of updated node set
notifyDataFlowManagementServiceOfNodeStatusChange();
} else {
logNodes("Heartbeat Monitoring determined all nodes are healthy", logger);
}
} catch (final Exception ex) {
logger.warn("Heartbeat monitor experienced exception while monitoring: " + ex, ex);
} finally {
writeLock.unlock("HeartbeatMonitoringTimerTask");
}
}
}
@Override
public ClusterNodeInformation getNodeInformation() {
readLock.lock();
try {
final Collection<NodeInformation> nodeInfos = new ArrayList<>();
for (final Node node : getRawNodes(Status.CONNECTED)) {
final NodeIdentifier id = node.getNodeId();
final HeartbeatPayload heartbeat = node.getHeartbeatPayload();
if (heartbeat == null) {
continue;
}
final Integer siteToSitePort = heartbeat.getSiteToSitePort();
if (siteToSitePort == null) {
continue;
}
final int flowFileCount = (int) heartbeat.getTotalFlowFileCount();
final NodeInformation nodeInfo = new NodeInformation(id.getApiAddress(), siteToSitePort, id.getApiPort(),
heartbeat.isSiteToSiteSecure(), flowFileCount);
nodeInfos.add(nodeInfo);
}
final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation();
clusterNodeInfo.setNodeInformation(nodeInfos);
return clusterNodeInfo;
} finally {
readLock.unlock("getNodeInformation");
}
}
@Override
public BulletinRepository getBulletinRepository() {
return bulletinRepository;
}
@Override
public ProcessGroupStatus getProcessGroupStatus(final String groupId) {
final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
// ensure there are some nodes in the cluster
if (connectedNodes.isEmpty()) {
throw new NoConnectedNodesException();
}
ProcessGroupStatus mergedProcessGroupStatus = null;
for (final Node node : connectedNodes) {
final NodeIdentifier nodeId = node.getNodeId();
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final ProcessGroupStatus nodeRootProcessGroupStatus = nodeHeartbeatPayload.getProcessGroupStatus();
final ProcessGroupStatus nodeProcessGroupStatus = groupId.equals(ROOT_GROUP_ID_ALIAS) ? nodeRootProcessGroupStatus : getProcessGroupStatus(nodeRootProcessGroupStatus, groupId);
if (nodeProcessGroupStatus == null) {
continue;
}
if (mergedProcessGroupStatus == null) {
mergedProcessGroupStatus = nodeProcessGroupStatus.clone();
// update any issues with the node label
if (mergedProcessGroupStatus.getRemoteProcessGroupStatus() != null) {
for (final RemoteProcessGroupStatus remoteProcessGroupStatus : mergedProcessGroupStatus.getRemoteProcessGroupStatus()) {
final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
if (!nodeAuthorizationIssues.isEmpty()) {
for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
final String Issue = iter.next();
iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
}
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
}
}
}
} else {
final ProcessGroupStatus nodeClone = nodeProcessGroupStatus.clone();
for (final RemoteProcessGroupStatus remoteProcessGroupStatus : nodeClone.getRemoteProcessGroupStatus()) {
final List<String> nodeAuthorizationIssues = remoteProcessGroupStatus.getAuthorizationIssues();
if (!nodeAuthorizationIssues.isEmpty()) {
for (final ListIterator<String> iter = nodeAuthorizationIssues.listIterator(); iter.hasNext();) {
final String Issue = iter.next();
iter.set("[" + nodeId.getApiAddress() + ":" + nodeId.getApiPort() + "] -- " + Issue);
}
remoteProcessGroupStatus.setAuthorizationIssues(nodeAuthorizationIssues);
}
}
ProcessGroupStatus.merge(mergedProcessGroupStatus, nodeClone);
}
}
return mergedProcessGroupStatus;
}
private ProcessGroupStatus getProcessGroupStatus(final ProcessGroupStatus parent, final String groupId) {
if (parent.getId().equals(groupId)) {
return parent;
}
for (final ProcessGroupStatus child : parent.getProcessGroupStatus()) {
final ProcessGroupStatus matching = getProcessGroupStatus(child, groupId);
if (matching != null) {
return matching;
}
}
return null;
}
@Override
public SystemDiagnostics getSystemDiagnostics() {
final Set<Node> connectedNodes = getNodes(Node.Status.CONNECTED);
// ensure there are some nodes...
if (connectedNodes.isEmpty()) {
throw new NoConnectedNodesException();
}
SystemDiagnostics clusterDiagnostics = null;
for (final Node node : connectedNodes) {
final HeartbeatPayload nodeHeartbeatPayload = node.getHeartbeatPayload();
if (nodeHeartbeatPayload == null) {
continue;
}
final SystemDiagnostics nodeDiagnostics = nodeHeartbeatPayload.getSystemDiagnostics();
if (nodeDiagnostics == null) {
continue;
}
if (clusterDiagnostics == null) {
clusterDiagnostics = nodeDiagnostics.clone();
} else {
merge(clusterDiagnostics, nodeDiagnostics);
}
}
return clusterDiagnostics;
}
private void merge(final SystemDiagnostics target, final SystemDiagnostics sd) {
// threads
target.setDaemonThreads(target.getDaemonThreads() + sd.getDaemonThreads());
target.setTotalThreads(target.getTotalThreads() + sd.getTotalThreads());
// heap
target.setTotalHeap(target.getTotalHeap() + sd.getTotalHeap());
target.setUsedHeap(target.getUsedHeap() + sd.getUsedHeap());
target.setMaxHeap(target.getMaxHeap() + sd.getMaxHeap());
// non heap
target.setTotalNonHeap(target.getTotalNonHeap() + sd.getTotalNonHeap());
target.setUsedNonHeap(target.getUsedNonHeap() + sd.getUsedNonHeap());
target.setMaxNonHeap(target.getMaxNonHeap() + sd.getMaxNonHeap());
// processors
target.setAvailableProcessors(target.getAvailableProcessors() + sd.getAvailableProcessors());
// load
if (sd.getProcessorLoadAverage() != null) {
if (target.getProcessorLoadAverage() != null) {
target.setProcessorLoadAverage(target.getProcessorLoadAverage() + sd.getProcessorLoadAverage());
} else {
target.setProcessorLoadAverage(sd.getProcessorLoadAverage());
}
}
// db disk usage
merge(target.getFlowFileRepositoryStorageUsage(), sd.getFlowFileRepositoryStorageUsage());
// repo disk usage
final Map<String, StorageUsage> targetContentRepoMap;
if (target.getContentRepositoryStorageUsage() == null) {
targetContentRepoMap = new LinkedHashMap<>();
target.setContentRepositoryStorageUsage(targetContentRepoMap);
} else {
targetContentRepoMap = target.getContentRepositoryStorageUsage();
}
if (sd.getContentRepositoryStorageUsage() != null) {
for (final Map.Entry<String, StorageUsage> sdEntry : sd.getContentRepositoryStorageUsage().entrySet()) {
final StorageUsage mergedDiskUsage = targetContentRepoMap.get(sdEntry.getKey());
if (mergedDiskUsage == null) {
targetContentRepoMap.put(sdEntry.getKey(), sdEntry.getValue());
} else {
merge(mergedDiskUsage, sdEntry.getValue());
}
}
}
// garbage collection
final Map<String, GarbageCollection> targetGarbageCollection;
if (target.getGarbageCollection() == null) {
targetGarbageCollection = new LinkedHashMap<>();
target.setGarbageCollection(targetGarbageCollection);
} else {
targetGarbageCollection = target.getGarbageCollection();
}
if (sd.getGarbageCollection() != null) {
for (final Map.Entry<String, GarbageCollection> gcEntry : sd.getGarbageCollection().entrySet()) {
final GarbageCollection mergedGarbageCollection = targetGarbageCollection.get(gcEntry.getKey());
if (mergedGarbageCollection == null) {
targetGarbageCollection.put(gcEntry.getKey(), gcEntry.getValue().clone());
} else {
merge(mergedGarbageCollection, gcEntry.getValue());
}
}
}
}
private void merge(final StorageUsage target, final StorageUsage du) {
target.setFreeSpace(target.getFreeSpace() + du.getFreeSpace());
target.setTotalSpace(target.getTotalSpace() + du.getTotalSpace());
}
private void merge(final GarbageCollection target, final GarbageCollection gc) {
target.setCollectionCount(target.getCollectionCount() + gc.getCollectionCount());
target.setCollectionTime(target.getCollectionTime() + gc.getCollectionTime());
}
public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
final long time = toNormalize.getTime();
return new Date(time - (time % numMillis));
}
private NodeDTO createNodeDTO(final Node node) {
final NodeDTO nodeDto = new NodeDTO();
final NodeIdentifier nodeId = node.getNodeId();
nodeDto.setNodeId(nodeId.getId());
nodeDto.setAddress(nodeId.getApiAddress());
nodeDto.setApiPort(nodeId.getApiPort());
nodeDto.setStatus(node.getStatus().name());
nodeDto.setPrimary(node.equals(getPrimaryNode()));
final Date connectionRequested = new Date(node.getConnectionRequestedTimestamp());
nodeDto.setConnectionRequested(connectionRequested);
return nodeDto;
}
private List<StatusSnapshotDTO> aggregate(Map<Date, List<StatusSnapshot>> snapshotsToAggregate) {
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = new ArrayList<>();
for (final Map.Entry<Date, List<StatusSnapshot>> entry : snapshotsToAggregate.entrySet()) {
final StatusSnapshotDTO dto = new StatusSnapshotDTO();
dto.setTimestamp(entry.getKey());
final List<StatusSnapshot> snapshots = entry.getValue();
final StatusSnapshot reducedSnapshot = snapshots.get(0).getValueReducer().reduce(snapshots);
dto.setStatusMetrics(StatusHistoryUtil.createStatusSnapshotDto(reducedSnapshot).getStatusMetrics());
aggregatedSnapshotDtos.add(dto);
}
return aggregatedSnapshotDtos;
}
public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId) {
return getProcessorStatusHistory(processorId, null, null, Integer.MAX_VALUE);
}
public ClusterStatusHistoryDTO getProcessorStatusHistory(final String processorId, final Date startDate, final Date endDate, final int preferredDataPoints) {
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
continue;
}
final StatusHistory statusHistory = statusRepository.getProcessorStatusHistory(processorId, startDate, endDate, preferredDataPoints);
if (statusHistory == null) {
continue;
}
processorDescriptors.addAll(statusRepository.getProcessorMetricDescriptors());
// record the status history (last) to get the component details for use later
final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
lastStatusHistory = statusHistoryDto;
final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
nodeHistory.setStatusHistory(statusHistoryDto);
nodeHistory.setNode(createNodeDTO(node));
nodeHistories.add(nodeHistory);
// collect all of the snapshots to aggregate
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
if (snapshots == null) {
snapshots = new ArrayList<>();
snapshotsToAggregate.put(normalizedDate, snapshots);
}
snapshots.add(snapshot);
}
}
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
// get the details for this component from the last status history
final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
clusterStatusHistory.setGenerated(new Date());
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processorDescriptors));
clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
history.setGenerated(new Date());
history.setNodeStatusHistory(nodeHistories);
history.setClusterStatusHistory(clusterStatusHistory);
return history;
}
public StatusHistoryDTO createStatusHistoryDto(final StatusHistory statusHistory) {
final StatusHistoryDTO dto = new StatusHistoryDTO();
dto.setDetails(new LinkedHashMap<>(statusHistory.getComponentDetails()));
dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(statusHistory));
dto.setGenerated(statusHistory.getDateGenerated());
final List<StatusSnapshotDTO> statusSnapshots = new ArrayList<>();
for (final StatusSnapshot statusSnapshot : statusHistory.getStatusSnapshots()) {
statusSnapshots.add(StatusHistoryUtil.createStatusSnapshotDto(statusSnapshot));
}
dto.setStatusSnapshots(statusSnapshots);
return dto;
}
public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId) {
return getConnectionStatusHistory(connectionId, null, null, Integer.MAX_VALUE);
}
public ClusterStatusHistoryDTO getConnectionStatusHistory(final String connectionId, final Date startDate, final Date endDate, final int preferredDataPoints) {
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
continue;
}
final StatusHistory statusHistory = statusRepository.getConnectionStatusHistory(connectionId, startDate, endDate, preferredDataPoints);
if (statusHistory == null) {
continue;
}
final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
// record the status history (last) to get the componet details for use later
lastStatusHistory = statusHistoryDto;
connectionDescriptors.addAll(statusRepository.getConnectionMetricDescriptors());
final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
nodeHistory.setStatusHistory(statusHistoryDto);
nodeHistory.setNode(createNodeDTO(node));
nodeHistories.add(nodeHistory);
// collect all of the snapshots to aggregate
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
if (snapshots == null) {
snapshots = new ArrayList<>();
snapshotsToAggregate.put(normalizedDate, snapshots);
}
snapshots.add(snapshot);
}
}
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
// get the details for this component from the last status history
final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
clusterStatusHistory.setGenerated(new Date());
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(connectionDescriptors));
clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
history.setGenerated(new Date());
history.setNodeStatusHistory(nodeHistories);
history.setClusterStatusHistory(clusterStatusHistory);
return history;
}
public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId) {
return getProcessGroupStatusHistory(processGroupId, null, null, Integer.MAX_VALUE);
}
public ClusterStatusHistoryDTO getProcessGroupStatusHistory(final String processGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
continue;
}
final StatusHistory statusHistory = statusRepository.getProcessGroupStatusHistory(processGroupId, startDate, endDate, preferredDataPoints);
if (statusHistory == null) {
continue;
}
final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
// record the status history (last) to get the componet details for use later
lastStatusHistory = statusHistoryDto;
processGroupDescriptors.addAll(statusRepository.getProcessGroupMetricDescriptors());
final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
nodeHistory.setStatusHistory(statusHistoryDto);
nodeHistory.setNode(createNodeDTO(node));
nodeHistories.add(nodeHistory);
// collect all of the snapshots to aggregate
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
if (snapshots == null) {
snapshots = new ArrayList<>();
snapshotsToAggregate.put(normalizedDate, snapshots);
}
snapshots.add(snapshot);
}
}
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
// get the details for this component from the last status history
final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
clusterStatusHistory.setGenerated(new Date());
clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(processGroupDescriptors));
clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
history.setGenerated(new Date());
history.setNodeStatusHistory(nodeHistories);
history.setClusterStatusHistory(clusterStatusHistory);
return history;
}
public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId) {
return getRemoteProcessGroupStatusHistory(remoteGroupId, null, null, Integer.MAX_VALUE);
}
public ClusterStatusHistoryDTO getRemoteProcessGroupStatusHistory(final String remoteGroupId, final Date startDate, final Date endDate, final int preferredDataPoints) {
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
if (statusRepository == null) {
continue;
}
final StatusHistory statusHistory = statusRepository.getRemoteProcessGroupStatusHistory(remoteGroupId, startDate, endDate, preferredDataPoints);
if (statusHistory == null) {
continue;
}
final StatusHistoryDTO statusHistoryDto = createStatusHistoryDto(statusHistory);
// record the status history (last) to get the componet details for use later
lastStatusHistory = statusHistoryDto;
remoteProcessGroupDescriptors.addAll(statusRepository.getRemoteProcessGroupMetricDescriptors());
final NodeStatusHistoryDTO nodeHistory = new NodeStatusHistoryDTO();
nodeHistory.setStatusHistory(statusHistoryDto);
nodeHistory.setNode(createNodeDTO(node));
nodeHistories.add(nodeHistory);
// collect all of the snapshots to aggregate
for (final StatusSnapshot snapshot : statusHistory.getStatusSnapshots()) {
final Date normalizedDate = normalizeStatusSnapshotDate(snapshot.getTimestamp(), componentStatusSnapshotMillis);
List<StatusSnapshot> snapshots = snapshotsToAggregate.get(normalizedDate);
if (snapshots == null) {
snapshots = new ArrayList<>();
snapshotsToAggregate.put(normalizedDate, snapshots);
}
snapshots.add(snapshot);
}
}
// Aggregate the snapshots
final List<StatusSnapshotDTO> aggregatedSnapshotDtos = aggregate(snapshotsToAggregate);
// get the details for this component from the last status history
final LinkedHashMap<String, String> clusterStatusHistoryDetails = new LinkedHashMap<>();
clusterStatusHistoryDetails.putAll(lastStatusHistory.getDetails());
final StatusHistoryDTO clusterStatusHistory = new StatusHistoryDTO();
clusterStatusHistory.setGenerated(new Date());
clusterStatusHistory.setDetails(clusterStatusHistoryDetails);
clusterStatusHistory.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(remoteProcessGroupDescriptors));
clusterStatusHistory.setStatusSnapshots(aggregatedSnapshotDtos);
final ClusterStatusHistoryDTO history = new ClusterStatusHistoryDTO();
history.setGenerated(new Date());
history.setNodeStatusHistory(nodeHistories);
history.setClusterStatusHistory(clusterStatusHistory);
return history;
}
private static class ClusterManagerLock {
private final Lock lock;
private static final Logger logger = LoggerFactory.getLogger("cluster.lock");
private long lockTime;
private final String name;
public ClusterManagerLock(final Lock lock, final String name) {
this.lock = lock;
this.name = name;
}
@SuppressWarnings("unused")
public boolean tryLock() {
logger.trace("Trying to obtain Cluster Manager Lock: {}", name);
final boolean success = lock.tryLock();
if (!success) {
logger.trace("TryLock failed for Cluster Manager Lock: {}", name);
return false;
}
logger.trace("TryLock successful");
return true;
}
public boolean tryLock(final long timeout, final TimeUnit timeUnit) {
logger.trace("Trying to obtain Cluster Manager Lock {} with a timeout of {} {}", name, timeout, timeUnit);
final boolean success;
try {
success = lock.tryLock(timeout, timeUnit);
} catch (final InterruptedException ie) {
return false;
}
if (!success) {
logger.trace("TryLock failed for Cluster Manager Lock {} with a timeout of {} {}", name, timeout, timeUnit);
return false;
}
logger.trace("TryLock successful");
return true;
}
public void lock() {
logger.trace("Obtaining Cluster Manager Lock {}", name);
lock.lock();
lockTime = System.nanoTime();
logger.trace("Obtained Cluster Manager Lock {}", name);
}
public void unlock(final String task) {
logger.trace("Releasing Cluster Manager Lock {}", name);
final long nanosLocked = System.nanoTime() - lockTime;
lock.unlock();
logger.trace("Released Cluster Manager Lock {}", name);
final long millisLocked = TimeUnit.MILLISECONDS.convert(nanosLocked, TimeUnit.NANOSECONDS);
if (millisLocked > 100L) {
logger.debug("Cluster Manager Lock {} held for {} milliseconds for task: {}", name, millisLocked, task);
}
}
}
@Override
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
return controllerServiceProvider.getControllerServiceIdentifiers(serviceType);
}
}