blob: 7a992aa6095470026d20d34783288f3429c969a2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "fetchopc.h"
#include <list>
#include <memory>
#include <string>
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "opc.h"
#include "utils/Enum.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
void FetchOPCProcessor::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void FetchOPCProcessor::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& factory) {
logger_->log_trace("FetchOPCProcessor::onSchedule");
state_manager_ = context.getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
translated_node_ids_.clear(); // Path might has changed during restart
BaseOPCProcessor::onSchedule(context, factory);
node_id_ = utils::parseProperty(context, NodeID);
max_depth_ = utils::parseU64Property(context, MaxDepth);
parseIdType(context, NodeIDType);
namespace_idx_ = gsl::narrow<int32_t>(utils::parseI64Property(context, NameSpaceIndex));
lazy_mode_ = utils::parseEnumProperty<LazyModeOptions>(context, Lazy);
if (id_type_ == opc::OPCNodeIDType::Path) {
readPathReferenceTypes(context, node_id_);
}
}
void FetchOPCProcessor::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
logger_->log_trace("FetchOPCProcessor::onTrigger");
if (!reconnect()) {
context.yield();
return;
}
size_t nodes_found = 0;
size_t variables_found = 0;
std::unordered_map<std::string, std::string> state_map;
state_manager_->get(state_map);
auto found_cb = [this, &context, &session, &nodes_found, &variables_found, &state_map](const UA_ReferenceDescription* ref, const std::string& path) {
return nodeFoundCallBack(ref, path, context, session, nodes_found, variables_found, state_map);
};
if (id_type_ != opc::OPCNodeIDType::Path) {
UA_NodeId my_id;
my_id.namespaceIndex = namespace_idx_;
if (id_type_ == opc::OPCNodeIDType::Int) {
my_id.identifierType = UA_NODEIDTYPE_NUMERIC;
my_id.identifier.numeric = std::stoi(node_id_); // NOLINT(cppcoreguidelines-pro-type-union-access)
} else if (id_type_ == opc::OPCNodeIDType::String) {
my_id.identifierType = UA_NODEIDTYPE_STRING;
my_id.identifier.string = UA_STRING_ALLOC(node_id_.c_str()); // NOLINT(cppcoreguidelines-pro-type-union-access)
} else {
logger_->log_error("Unhandled id type: '{}'. No flowfiles are generated.", magic_enum::enum_underlying(id_type_));
context.yield();
return;
}
connection_->traverse(my_id, found_cb, "", max_depth_);
} else {
if (translated_node_ids_.empty()) {
auto sc = connection_->translateBrowsePathsToNodeIdsRequest(node_id_, translated_node_ids_, namespace_idx_, path_reference_types_, logger_);
if (sc != UA_STATUSCODE_GOOD) {
logger_->log_error("Failed to translate {} to node id, no flow files will be generated ({})", node_id_.c_str(), UA_StatusCode_name(sc));
context.yield();
return;
}
}
for (auto& node_id : translated_node_ids_) {
connection_->traverse(node_id, found_cb, node_id_, max_depth_);
}
}
if (nodes_found == 0) {
logger_->log_warn("Connected to OPC server, but no variable nodes were found. Configuration might be incorrect! Yielding...");
context.yield();
} else if (variables_found == 0) {
logger_->log_warn("Found no variables when traversing the specified node. No flowfiles are generated. Yielding...");
context.yield();
}
state_manager_->set(state_map);
}
void FetchOPCProcessor::writeFlowFileUsingLazyModeWithTimestamp(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found,
std::unordered_map<std::string, std::string>& state_map) const {
auto writeDataToFlowFile = [this, &nodedata, &context, &session, &variables_found]() {
OPCData2FlowFile(nodedata, context, session);
++variables_found;
};
auto full_path_it = nodedata.attributes.find("Full path");
if (full_path_it == nodedata.attributes.end()) {
logger_->log_error("Node data does not contain 'Full path' attribute, cannot read state for node");
writeDataToFlowFile();
return;
}
auto source_timestamp_it = nodedata.attributes.find("Sourcetimestamp");
if (source_timestamp_it == nodedata.attributes.end()) {
logger_->log_error("Node data does not contain 'Sourcetimestamp' attribute, cannot read state for node");
writeDataToFlowFile();
return;
}
auto new_state_value = source_timestamp_it->second;
auto nodeid = full_path_it->second + "_timestamp";
auto cur_state_value = state_map[nodeid];
if (cur_state_value.empty() || cur_state_value != new_state_value) {
logger_->log_debug("Node {} has new source timestamp", full_path_it->second);
state_map[nodeid] = new_state_value;
writeDataToFlowFile();
return;
}
logger_->log_debug("Node {} has no new source timestamp, skipping", full_path_it->second);
}
void FetchOPCProcessor::writeFlowFileUsingLazyModeWithNewValue(const opc::NodeData& nodedata, core::ProcessContext& context, core::ProcessSession& session, size_t& variables_found,
std::unordered_map<std::string, std::string>& state_map) const {
auto writeDataToFlowFile = [this, &nodedata, &context, &session, &variables_found]() {
OPCData2FlowFile(nodedata, context, session);
++variables_found;
};
auto full_path_it = nodedata.attributes.find("Full path");
if (full_path_it == nodedata.attributes.end()) {
logger_->log_error("Node data does not contain 'Full path' attribute, cannot read state for node");
writeDataToFlowFile();
return;
}
auto new_state_value = opc::nodeValue2String(nodedata);
auto nodeid = full_path_it->second + "_new_value";
auto cur_state_value = state_map[nodeid];
if (cur_state_value.empty() || cur_state_value != new_state_value) {
logger_->log_debug("Node {} has new value", full_path_it->second);
state_map[nodeid] = new_state_value;
writeDataToFlowFile();
return;
}
logger_->log_debug("Node {} has no new value, skipping", full_path_it->second);
}
bool FetchOPCProcessor::nodeFoundCallBack(const UA_ReferenceDescription *ref, const std::string& path,
core::ProcessContext& context, core::ProcessSession& session, size_t& nodes_found, size_t& variables_found,
std::unordered_map<std::string, std::string>& state_map) {
++nodes_found;
if (ref->nodeClass != UA_NODECLASS_VARIABLE) {
return true;
}
try {
opc::NodeData nodedata = connection_->getNodeData(ref, path);
if (lazy_mode_ == LazyModeOptions::On) {
writeFlowFileUsingLazyModeWithTimestamp(nodedata, context, session, variables_found, state_map);
} else if (lazy_mode_ == LazyModeOptions::NewValue) {
writeFlowFileUsingLazyModeWithNewValue(nodedata, context, session, variables_found, state_map);
} else {
OPCData2FlowFile(nodedata, context, session);
++variables_found;
}
} catch (const std::exception& exception) {
std::string browse_name(reinterpret_cast<char*>(ref->browseName.name.data), ref->browseName.name.length);
logger_->log_warn("Caught Exception while trying to get data from node {}: {}", path + "/" + browse_name, exception.what());
}
return true;
}
void FetchOPCProcessor::OPCData2FlowFile(const opc::NodeData& opc_node, core::ProcessContext&, core::ProcessSession& session) const {
auto flow_file = session.create();
if (flow_file == nullptr) {
logger_->log_error("Failed to create flowfile!");
return;
}
for (const auto& attr : opc_node.attributes) {
flow_file->setAttribute(attr.first, attr.second);
}
if (!opc_node.data.empty()) {
try {
session.writeBuffer(flow_file, opc::nodeValue2String(opc_node));
} catch (const std::exception& e) {
std::string browsename;
flow_file->getAttribute("Browsename", browsename);
logger_->log_info("Failed to extract data of OPC node {}: {}", browsename, e.what());
session.transfer(flow_file, Failure);
return;
}
}
session.transfer(flow_file, Success);
}
REGISTER_RESOURCE(FetchOPCProcessor, Processor);
} // namespace org::apache::nifi::minifi::processors