blob: 890f6615522d72b2ec3ece8cd546bf7170e8ab0a [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_
#define LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "core/Resource.h"
#if ( defined(__APPLE__) || defined(__MACH__) || defined(BSD))
#include <net/if_dl.h>
#include <net/if_types.h>
#endif
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <map>
#include <sstream>
#include "../FlowIdentifier.h"
#include "../nodes/MetricsBase.h"
#include "../nodes/StateMonitor.h"
#include "Connection.h"
#include "io/ClientSocket.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace state {
namespace response {
class FlowVersion : public DeviceInformation {
public:
FlowVersion()
: DeviceInformation("FlowVersion") {
setFlowVersion("", "", getUUIDStr());
}
explicit FlowVersion(const std::string &registry_url, const std::string &bucket_id, const std::string &flow_id)
: DeviceInformation("FlowVersion") {
setFlowVersion(registry_url, bucket_id, flow_id.empty() ? getUUIDStr() : flow_id);
}
explicit FlowVersion(FlowVersion &&fv)
: DeviceInformation("FlowVersion"),
identifier(std::move(fv.identifier)) {
}
std::string getName() const {
return "FlowVersion";
}
virtual std::shared_ptr<state::FlowIdentifier> getFlowIdentifier() const {
std::lock_guard<std::mutex> lock(guard);
return identifier;
}
/**
* In most cases the lock guard isn't necessary for these getters; however,
* we don't want to cause issues if the FlowVersion object is ever used in a way
* that breaks the current paradigm.
*/
std::string getRegistryUrl() {
std::lock_guard<std::mutex> lock(guard);
return identifier->getRegistryUrl();
}
std::string getBucketId() {
std::lock_guard<std::mutex> lock(guard);
return identifier->getBucketId();
}
std::string getFlowId() {
std::lock_guard<std::mutex> lock(guard);
return identifier->getFlowId();
}
void setFlowVersion(const std::string &url, const std::string &bucket_id, const std::string &flow_id) {
std::lock_guard<std::mutex> lock(guard);
identifier = std::make_shared<FlowIdentifier>(url, bucket_id, flow_id);
}
std::vector<SerializedResponseNode> serialize() {
std::lock_guard<std::mutex> lock(guard);
std::vector<SerializedResponseNode> serialized;
SerializedResponseNode ru;
ru.name = "registryUrl";
ru.value = identifier->getRegistryUrl();
SerializedResponseNode bucketid;
bucketid.name = "bucketId";
bucketid.value = identifier->getBucketId();
SerializedResponseNode flowId;
flowId.name = "flowId";
flowId.value = identifier->getFlowId();
serialized.push_back(ru);
serialized.push_back(bucketid);
serialized.push_back(flowId);
return serialized;
}
FlowVersion &operator=(const FlowVersion &&fv) {
identifier = std::move(fv.identifier);
return *this;
}
protected:
mutable std::mutex guard;
std::shared_ptr<FlowIdentifier> identifier;
};
class FlowMonitor : public StateMonitorNode {
public:
FlowMonitor(const std::string &name, const utils::Identifier &uuid)
: StateMonitorNode(name, uuid) {
}
FlowMonitor(const std::string &name) // NOLINT
: StateMonitorNode(name) {
}
void addConnection(const std::shared_ptr<minifi::Connection> &connection) {
if (nullptr != connection) {
connections_.insert(std::make_pair(connection->getUUIDStr(), connection));
}
}
void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
flow_version_ = flow_version;
}
protected:
std::shared_ptr<state::response::FlowVersion> flow_version_;
std::map<std::string, std::shared_ptr<minifi::Connection>> connections_;
};
/**
* Justification and Purpose: Provides flow version Information
*/
class FlowInformation : public FlowMonitor {
public:
FlowInformation(const std::string &name, const utils::Identifier &uuid)
: FlowMonitor(name, uuid) {
}
FlowInformation(const std::string &name) // NOLINT
: FlowMonitor(name) {
}
std::string getName() const {
return "flowInfo";
}
std::vector<SerializedResponseNode> serialize() {
std::vector<SerializedResponseNode> serialized;
SerializedResponseNode fv;
fv.name = "flowId";
fv.value = flow_version_->getFlowId();
SerializedResponseNode uri;
uri.name = "versionedFlowSnapshotURI";
for (auto &entry : flow_version_->serialize()) {
uri.children.push_back(entry);
}
serialized.push_back(fv);
serialized.push_back(uri);
if (!connections_.empty()) {
SerializedResponseNode queues(false);
queues.name = "queues";
for (auto &queue : connections_) {
SerializedResponseNode repoNode(false);
repoNode.name = queue.second->getName();
SerializedResponseNode queueUUIDNode;
queueUUIDNode.name = "uuid";
queueUUIDNode.value = std::string{queue.second->getUUIDStr()};
SerializedResponseNode queuesize;
queuesize.name = "size";
queuesize.value = queue.second->getQueueSize();
SerializedResponseNode queuesizemax;
queuesizemax.name = "sizeMax";
queuesizemax.value = queue.second->getMaxQueueSize();
SerializedResponseNode datasize;
datasize.name = "dataSize";
datasize.value = queue.second->getQueueDataSize();
SerializedResponseNode datasizemax;
datasizemax.name = "dataSizeMax";
datasizemax.value = queue.second->getMaxQueueDataSize();
repoNode.children.push_back(queuesize);
repoNode.children.push_back(queuesizemax);
repoNode.children.push_back(datasize);
repoNode.children.push_back(datasizemax);
repoNode.children.push_back(queueUUIDNode);
queues.children.push_back(repoNode);
}
serialized.push_back(queues);
}
if (nullptr != monitor_) {
auto components = monitor_->getAllComponents();
SerializedResponseNode componentsNode(false);
componentsNode.name = "components";
for (auto component : components) {
SerializedResponseNode componentNode(false);
componentNode.name = component->getComponentName();
SerializedResponseNode uuidNode;
uuidNode.name = "uuid";
uuidNode.value = std::string{component->getComponentUUID().to_string()};
SerializedResponseNode componentStatusNode;
componentStatusNode.name = "running";
componentStatusNode.value = component->isRunning();
componentNode.children.push_back(componentStatusNode);
componentNode.children.push_back(uuidNode);
componentsNode.children.push_back(componentNode);
}
serialized.push_back(componentsNode);
}
return serialized;
}
protected:
};
REGISTER_RESOURCE(FlowInformation, "Node part of an AST that defines the flow ID and flow URL deployed to this agent");
} // namespace response
} // namespace state
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org
#endif // LIBMINIFI_INCLUDE_CORE_STATE_NODES_FLOWINFORMATION_H_